This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9a99664 [SQL] Add Data Catalog Table Provider
new 9c8a8dc Merge pull request #8349 from
akedin/datacatalog-table-provider
9a99664 is described below
commit 9a99664822a8fd79bb5491e206159fce064c9882
Author: akedin <[email protected]>
AuthorDate: Wed Apr 17 22:55:01 2019 -0700
[SQL] Add Data Catalog Table Provider
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 3 +
sdks/java/extensions/sql/datacatalog/build.gradle | 63 +++++++++++
.../sql/example/BeamSqlDataCatalogExample.java | 103 +++++++++++++++++
.../sdk/extensions/sql/example/package-info.java | 20 ++++
.../meta/provider/datacatalog/BigQueryUtils.java | 58 ++++++++++
.../datacatalog/DataCatalogClientAdapter.java | 94 ++++++++++++++++
.../datacatalog/DataCatalogPipelineOptions.java | 35 ++++++
.../datacatalog/DataCatalogTableProvider.java | 122 +++++++++++++++++++++
.../sql/meta/provider/datacatalog/PubsubUtils.java | 53 +++++++++
.../sql/meta/provider/datacatalog/SchemaUtils.java | 98 +++++++++++++++++
.../sql/meta/provider/datacatalog/TableUtils.java | 59 ++++++++++
.../meta/provider/datacatalog/package-info.java | 20 ++++
.../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 4 +-
settings.gradle | 2 +
14 files changed, 733 insertions(+), 1 deletion(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 74bc02f..6614954 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -350,6 +350,7 @@ class BeamModulePlugin implements Plugin<Project> {
def cassandra_driver_version = "3.6.0"
def generated_grpc_beta_version = "0.44.0"
def generated_grpc_ga_version = "1.43.0"
+ def generated_grpc_dc_beta_version = "0.1.0-alpha"
def google_auth_version = "0.12.0"
def google_clients_version = "1.27.0"
def google_cloud_bigdataoss_version = "1.9.16"
@@ -442,6 +443,7 @@ class BeamModulePlugin implements Plugin<Project> {
grpc_all :
"io.grpc:grpc-all:$grpc_version",
grpc_auth :
"io.grpc:grpc-auth:$grpc_version",
grpc_core :
"io.grpc:grpc-core:$grpc_version",
+ grpc_google_cloud_datacatalog_v1beta1 :
"com.google.api.grpc:grpc-google-cloud-datacatalog-v1beta1:$generated_grpc_dc_beta_version",
grpc_google_cloud_pubsub_v1 :
"com.google.api.grpc:grpc-google-cloud-pubsub-v1:$generated_grpc_ga_version",
grpc_protobuf :
"io.grpc:grpc-protobuf:$grpc_version",
grpc_protobuf_lite :
"io.grpc:grpc-protobuf-lite:$grpc_version",
@@ -480,6 +482,7 @@ class BeamModulePlugin implements Plugin<Project> {
powermock :
"org.powermock:powermock-mockito-release-full:1.6.4",
protobuf_java :
"com.google.protobuf:protobuf-java:$protobuf_version",
protobuf_java_util :
"com.google.protobuf:protobuf-java-util:$protobuf_version",
+ proto_google_cloud_datacatalog_v1beta1 :
"com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1:$generated_grpc_dc_beta_version",
proto_google_cloud_pubsub_v1 :
"com.google.api.grpc:proto-google-cloud-pubsub-v1:$generated_grpc_ga_version",
proto_google_cloud_spanner_admin_database_v1:
"com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1:$google_cloud_spanner_version",
proto_google_common_protos :
"com.google.api.grpc:proto-google-common-protos:$proto_google_common_protos_version",
diff --git a/sdks/java/extensions/sql/datacatalog/build.gradle
b/sdks/java/extensions/sql/datacatalog/build.gradle
new file mode 100644
index 0000000..9bf974a
--- /dev/null
+++ b/sdks/java/extensions/sql/datacatalog/build.gradle
@@ -0,0 +1,63 @@
+import groovy.json.JsonOutput
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+
+applyJavaNature(
+ testShadowJar: true
+)
+
+dependencies {
+ shadow library.java.grpc_google_cloud_datacatalog_v1beta1
+ shadow library.java.proto_google_cloud_datacatalog_v1beta1
+ provided project(path: ":beam-sdks-java-extensions-sql", configuration:
"shadow")
+
+ // For Data Catalog GRPC client
+ provided library.java.grpc_all
+ provided library.java.google_auth_library_oauth2_http
+ provided library.java.grpc_netty
+ provided library.java.netty_tcnative_boringssl_static
+
+ // Dependencies for the example
+ provided project(path: ":beam-sdks-java-io-google-cloud-platform",
configuration: "shadow")
+ provided library.java.vendored_guava_20_0
+ provided library.java.slf4j_api
+ provided library.java.slf4j_simple
+}
+
+task runDataCatalogExample(type: JavaExec) {
+ description = "Run SQL example of how to use Data Catalog table provider"
+ main = "org.apache.beam.sdk.extensions.sql.example.BeamSqlDataCatalogExample"
+ classpath = sourceSets.main.runtimeClasspath
+
+ def runner = project.findProperty('runner') ?: 'DirectRunner'
+ def queryString = project.findProperty('queryString') ?: ''
+ def outputFilePrefix = project.findProperty('outputFilePrefix') ?: ''
+ def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
+ def gcsTempRoot = project.findProperty('gcsTempRoot') ?:
'gs://temp-storage-for-end-to-end-tests/'
+
+ args = [
+ "--runner=${runner}",
+ "--queryString=${queryString}",
+ "--outputFilePrefix=${outputFilePrefix}",
+ "--project=${gcpProject}",
+ "--tempLocation=${gcsTempRoot}",
+ ]
+}
diff --git
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlDataCatalogExample.java
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlDataCatalogExample.java
new file mode 100644
index 0000000..8793842
--- /dev/null
+++
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlDataCatalogExample.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.example;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog.DataCatalogTableProvider;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Example pipeline that uses Google Cloud Data Catalog to retrieve the table
metadata. */
+public class BeamSqlDataCatalogExample {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BeamSqlDataCatalogExample.class);
+
+ /** Pipeline options to specify the query and the output for the example. */
+ public interface DCExamplePipelineOptions extends PipelineOptions {
+
+ /** SQL Query. */
+ @Description("Required. SQL Query containing the pipeline logic.")
+ @Validation.Required
+ String getQueryString();
+
+ void setQueryString(String queryString);
+
+ /** Output file prefix. */
+ @Description("Required. Output file prefix.")
+ @Validation.Required
+ String getOutputFilePrefix();
+
+ void setOutputFilePrefix(String outputPathPrefix);
+ }
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("Args: {}", Arrays.asList(args));
+ DCExamplePipelineOptions options =
+
PipelineOptionsFactory.fromArgs(args).as(DCExamplePipelineOptions.class);
+ LOG.info("Query: {}\nOutput: {}", options.getQueryString(),
options.getOutputFilePrefix());
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ validateArgs(options);
+
+ pipeline
+ .apply(
+ "SQL Query",
+ SqlTransform.query(options.getQueryString())
+ .withDefaultTableProvider("datacatalog",
DataCatalogTableProvider.create(options)))
+ .apply("Convert to Strings", rowsToStrings())
+ .apply("Write output",
TextIO.write().to(options.getOutputFilePrefix()));
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ private static MapElements<Row, String> rowsToStrings() {
+ return MapElements.into(TypeDescriptor.of(String.class))
+ .via(
+ row ->
row.getValues().stream().map(String::valueOf).collect(Collectors.joining(",
")));
+ }
+
+ private static void validateArgs(DCExamplePipelineOptions options) {
+ if (Strings.isNullOrEmpty(options.getQueryString())
+ || Strings.isNullOrEmpty(options.getOutputFilePrefix())) {
+ String usage =
+ "ERROR: SQL query or output file is not specified."
+ + "To run this example:\n"
+ + "./gradlew "
+ +
":beam-sdks-java-extensions-sql-datacatalog:runDataCatalogExample "
+ + "-PgcpProject=<project> "
+ + "-PgcsTempRoot=<GCS temp location> "
+ + "-PqueryString=<query> "
+ + "-PoutputFilePrefix=<output location> "
+ + "-PtempLocation=<temp GCS location for BQ export>\n\n";
+ throw new IllegalArgumentException(usage);
+ }
+ }
+}
diff --git
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
new file mode 100644
index 0000000..7d6ff31
--- /dev/null
+++
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Example how to use Data Catalog table provider. */
+package org.apache.beam.sdk.extensions.sql.example;
diff --git
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryUtils.java
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryUtils.java
new file mode 100644
index 0000000..c199ed0
--- /dev/null
+++
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.cloud.datacatalog.Entry;
+import java.net.URI;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+
+/** Utils to extract BQ-specific entry information. */
+class BigQueryUtils {
+
+ private static final Pattern BQ_PATH_PATTERN =
+ Pattern.compile(
+
"/projects/(?<PROJECT>[^/]+)/datasets/(?<DATASET>[^/]+)/tables/(?<TABLE>[^/]+)");
+
+ static Table.Builder tableBuilder(Entry entry) {
+ return Table.builder()
+ .location(getLocation(entry))
+ .properties(new JSONObject())
+ .type("bigquery")
+ .comment("");
+ }
+
+ private static String getLocation(Entry entry) {
+ URI entryName = URI.create(entry.getLinkedResource());
+ String bqPath = entryName.getPath();
+
+ Matcher bqPathMatcher = BQ_PATH_PATTERN.matcher(bqPath);
+ if (!bqPathMatcher.matches()) {
+ throw new IllegalArgumentException(
+ "Unsupported format for BigQuery table path: '" +
entry.getLinkedResource() + "'");
+ }
+
+ String project = bqPathMatcher.group("PROJECT");
+ String dataset = bqPathMatcher.group("DATASET");
+ String table = bqPathMatcher.group("TABLE");
+
+ return String.format("%s:%s.%s", project, dataset, table);
+ }
+}
diff --git
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogClientAdapter.java
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogClientAdapter.java
new file mode 100644
index 0000000..973c3a8
--- /dev/null
+++
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogClientAdapter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.datacatalog.DataCatalogGrpc;
+import com.google.cloud.datacatalog.DataCatalogGrpc.DataCatalogBlockingStub;
+import com.google.cloud.datacatalog.Entry;
+import com.google.cloud.datacatalog.LookupEntryRequest;
+import io.grpc.CallCredentials;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.MethodDescriptor;
+import io.grpc.auth.MoreCallCredentials;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+
+/** Wraps DataCatalog GRPC client and exposes simplified APIS for Data Catalog
Table Provider. */
+class DataCatalogClientAdapter {
+
+ private DataCatalogBlockingStub dcClient;
+
+ private DataCatalogClientAdapter(DataCatalogBlockingStub dcClient) {
+ this.dcClient = dcClient;
+ }
+
+ /** Prod endpoint (default set in pipeline options):
datacatalog.googleapis.com. */
+ public static DataCatalogClientAdapter withDefaultCredentials(String
endpoint)
+ throws IOException {
+ return new DataCatalogClientAdapter(newClient(endpoint));
+ }
+
+ private static DataCatalogBlockingStub newClient(String endpoint) throws
IOException {
+ Channel authedChannel =
+ ClientInterceptors.intercept(
+ ManagedChannelBuilder.forTarget(endpoint).build(),
+ CredentialsInterceptor.defaultCredentials());
+ return DataCatalogGrpc.newBlockingStub(authedChannel);
+ }
+
+ public @Nullable Table getTable(String tableName) {
+ Entry entry = dcClient.lookupEntry(sqlResource(tableName));
+ return TableUtils.toBeamTable(tableName, entry);
+ }
+
+ private LookupEntryRequest sqlResource(String tableName) {
+ return LookupEntryRequest.newBuilder().setSqlResource(tableName).build();
+ }
+
+ /** Provides default credentials. */
+ private static final class CredentialsInterceptor implements
ClientInterceptor {
+
+ private CallCredentials callCredentials;
+
+ private CredentialsInterceptor(CallCredentials callCredentials) {
+ this.callCredentials = callCredentials;
+ }
+
+ public static CredentialsInterceptor defaultCredentials() throws
IOException {
+ GoogleCredentials defaultCredentials =
GoogleCredentials.getApplicationDefault();
+ return of(MoreCallCredentials.from(defaultCredentials));
+ }
+
+ public static CredentialsInterceptor of(CallCredentials credentials) {
+ return new CredentialsInterceptor(credentials);
+ }
+
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+ MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel
next) {
+ return next.newCall(method,
callOptions.withCallCredentials(callCredentials));
+ }
+ }
+}
diff --git
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogPipelineOptions.java
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogPipelineOptions.java
new file mode 100644
index 0000000..47fffcf
--- /dev/null
+++
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogPipelineOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.Validation;
+
+/** Pipeline options for Data Catalog table provider. */
+public interface DataCatalogPipelineOptions extends PipelineOptions {
+
+ /** DataCatalog endpoint. */
+ @Description("Data catalog endpoint.")
+ @Validation.Required
+ @Default.String("datacatalog.googleapis.com")
+ String getDataCatalogEndpoint();
+
+ void setDataCatalogEndpoint(String dataCatalogEndpoint);
+}
diff --git
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
new file mode 100644
index 0000000..44c6a78
--- /dev/null
+++
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import static java.util.stream.Collectors.toMap;
+
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
+import org.apache.beam.sdk.options.PipelineOptions;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+
+/** Uses DataCatalog to get the source type and schema for a table. */
+public class DataCatalogTableProvider implements TableProvider {
+
+ private Map<String, TableProvider> delegateProviders;
+ private Map<String, Table> tableCache;
+ private DataCatalogClientAdapter dataCatalog;
+
+ private DataCatalogTableProvider(
+ Map<String, TableProvider> delegateProviders, DataCatalogClientAdapter
dataCatalogClient) {
+
+ this.tableCache = new HashMap<>();
+ this.delegateProviders = ImmutableMap.copyOf(delegateProviders);
+ this.dataCatalog = dataCatalogClient;
+ }
+
+ public static DataCatalogTableProvider create(PipelineOptions
pipelineOptions)
+ throws IOException {
+
+ DataCatalogPipelineOptions options =
pipelineOptions.as(DataCatalogPipelineOptions.class);
+
+ return new DataCatalogTableProvider(
+ getSupportedProviders(),
getDataCatalogClient(options.getDataCatalogEndpoint()));
+ }
+
+ private static Map<String, TableProvider> getSupportedProviders() {
+ return Stream.of(
+ new PubsubJsonTableProvider(), new BigQueryTableProvider(), new
TextTableProvider())
+ .collect(toMap(TableProvider::getTableType, p -> p));
+ }
+
+ private static DataCatalogClientAdapter getDataCatalogClient(String
endpoint) throws IOException {
+ return DataCatalogClientAdapter.withDefaultCredentials(endpoint);
+ }
+
+ @Override
+ public String getTableType() {
+ return "google.cloud.datacatalog";
+ }
+
+ @Override
+ public void createTable(Table table) {
+ throw new UnsupportedOperationException(
+ "Creating tables is not supported with DataCatalog table provider.");
+ }
+
+ @Override
+ public void dropTable(String tableName) {
+ throw new UnsupportedOperationException(
+ "Dropping tables is not supported with DataCatalog table provider");
+ }
+
+ @Override
+ public Map<String, Table> getTables() {
+ throw new UnsupportedOperationException("Loading all tables from
DataCatalog is not supported");
+ }
+
+ @Override
+ public @Nullable Table getTable(String tableName) {
+ return loadTable(tableName);
+ }
+
+ private @Nullable Table loadTable(String tableName) {
+ if (!tableCache.containsKey(tableName)) {
+ tableCache.put(tableName, loadTableFromDC(tableName));
+ }
+
+ return tableCache.get(tableName);
+ }
+
+ private Table loadTableFromDC(String tableName) {
+ try {
+ return dataCatalog.getTable(tableName);
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().equals(Status.INVALID_ARGUMENT)) {
+ return null;
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public BeamSqlTable buildBeamSqlTable(Table table) {
+ return delegateProviders.get(table.getType()).buildBeamSqlTable(table);
+ }
+}
diff --git
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubUtils.java
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubUtils.java
new file mode 100644
index 0000000..856eec9
--- /dev/null
+++
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.cloud.datacatalog.Entry;
+import java.net.URI;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+
+/** Utils to extract Pubsub-specific entry information. */
+class PubsubUtils {
+
+ private static final Pattern PS_PATH_PATTERN =
+ Pattern.compile("/projects/(?<PROJECT>[^/]+)/topics/(?<TOPIC>[^/]+)");
+
+ static Table.Builder tableBuilder(Entry entry) {
+ return Table.builder()
+ .location(getLocation(entry))
+ .properties(new JSONObject())
+ .type("pubsub")
+ .comment("");
+ }
+
+ private static String getLocation(Entry entry) {
+ URI entryName = URI.create(entry.getLinkedResource());
+ String psPath = entryName.getPath();
+
+ Matcher bqPathMatcher = PS_PATH_PATTERN.matcher(psPath);
+ if (!bqPathMatcher.matches()) {
+ throw new IllegalArgumentException(
+ "Unsupported format for Pubsub topic: '" + entry.getLinkedResource()
+ "'");
+ }
+
+ return psPath.substring(1);
+ }
+}
diff --git
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
new file mode 100644
index 0000000..c28c820
--- /dev/null
+++
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
+
+import com.google.cloud.datacatalog.ColumnSchema;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+
+class SchemaUtils {
+
+ private static final Map<String, FieldType> FIELD_TYPES =
+ ImmutableMap.<String, FieldType>builder()
+ .put("TYPE_BOOL", FieldType.BOOLEAN)
+ .put("TYPE_BYTES", FieldType.BYTES)
+ .put("TYPE_DATE", FieldType.logicalType(new CalciteUtils.DateType()))
+ .put("TYPE_DATETIME", FieldType.DATETIME)
+ .put("TYPE_DOUBLE", FieldType.DOUBLE)
+ .put("TYPE_FLOAT", FieldType.DOUBLE)
+ .put("TYPE_INT32", FieldType.INT32)
+ .put("TYPE_INT64", FieldType.INT64)
+ .put("TYPE_STRING", FieldType.STRING)
+ .put("TYPE_TIME", FieldType.logicalType(new CalciteUtils.TimeType()))
+ .put("TYPE_TIMESTAMP", FieldType.DATETIME)
+ .put(
+ "TYPE_MAP<TYPE_STRING, TYPE_STRING>",
+ FieldType.map(FieldType.STRING, FieldType.STRING))
+ .build();
+
+ /** Convert DataCatalog schema to Beam schema. */
+ static Schema fromDataCatalog(com.google.cloud.datacatalog.Schema dcSchema) {
+ return fromColumnsList(dcSchema.getColumnsList());
+ }
+
+ private static Schema fromColumnsList(List<ColumnSchema> columnsMap) {
+ return
columnsMap.stream().map(SchemaUtils::toBeamField).collect(toSchema());
+ }
+
+ private static Field toBeamField(ColumnSchema column) {
+ String name = column.getColumn();
+
+ // basic field type
+ FieldType fieldType = getBeamFieldType(column);
+ Field field = Field.of(name, fieldType);
+
+ // set the nullable flag, or convert to a list if repeated
+ if (Strings.isNullOrEmpty(column.getMode()) ||
"NULLABLE".equals(column.getMode())) {
+ field = field.withNullable(true);
+ } else if ("REQUIRED".equals(column.getMode())) {
+ field = field.withNullable(false);
+ } else if ("REPEATED".equals(column.getMode())) {
+ field = Field.of(name, FieldType.array(fieldType));
+ } else {
+ throw new UnsupportedOperationException(
+ "Field mode '" + column.getMode() + "' is not supported (field '" +
name + "')");
+ }
+
+ return field;
+ }
+
+ private static FieldType getBeamFieldType(ColumnSchema column) {
+ String dcFieldType = column.getType();
+
+ if (FIELD_TYPES.containsKey(dcFieldType)) {
+ return FIELD_TYPES.get(dcFieldType);
+ }
+
+ if ("TYPE_STRUCT".equals(dcFieldType)) {
+ Schema structSchema = fromColumnsList(column.getSubcolumnsList());
+ return FieldType.row(structSchema);
+ }
+
+ throw new UnsupportedOperationException(
+ "Field type '" + dcFieldType + "' is not supported (field '" +
column.getColumn() + "')");
+ }
+}
diff --git
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java
new file mode 100644
index 0000000..ebf3b99
--- /dev/null
+++
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import com.google.cloud.datacatalog.Entry;
+import java.net.URI;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+
+/** Common utilities to create Beam SQL tables from Data Catalog schemas. */
+class TableUtils {
+
+ interface TableFactory {
+ Table.Builder tableBuilder(Entry entry);
+ }
+
+ private static final Map<String, TableFactory> TABLE_FACTORIES =
+ ImmutableMap.<String, TableFactory>builder()
+ .put("bigquery.googleapis.com", BigQueryUtils::tableBuilder)
+ .put("pubsub.googleapis.com", PubsubUtils::tableBuilder)
+ .build();
+
+ static Table toBeamTable(String tableName, Entry entry) {
+ if (entry.getSchema().getColumnsCount() == 0) {
+ throw new UnsupportedOperationException(
+ "Entry doesn't have a schema. Please attach a schema to '"
+ + tableName
+ + "' in Data Catalog: "
+ + entry.toString());
+ }
+
+ String service =
URI.create(entry.getLinkedResource()).getAuthority().toLowerCase();
+
+ if (!TABLE_FACTORIES.containsKey(service)) {
+ throw new UnsupportedOperationException(
+ "Unsupported SQL source kind: " + entry.getLinkedResource());
+ }
+
+ Schema schema = SchemaUtils.fromDataCatalog(entry.getSchema());
+ return
TABLE_FACTORIES.get(service).tableBuilder(entry).schema(schema).name(tableName).build();
+ }
+}
diff --git
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/package-info.java
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/package-info.java
new file mode 100644
index 0000000..029f3f8
--- /dev/null
+++
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Table schema for Google Cloud Data Catalog. */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index f63d8ac..2c1e138 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -430,7 +430,9 @@ public class BigQueryUtils {
}
private static Object convertAvroString(Object value) {
- if (value instanceof org.apache.avro.util.Utf8) {
+ if (value == null) {
+ return null;
+ } else if (value instanceof org.apache.avro.util.Utf8) {
return ((org.apache.avro.util.Utf8) value).toString();
} else if (value instanceof String) {
return value;
diff --git a/settings.gradle b/settings.gradle
index 65e67fa..5d2dfc8 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -125,6 +125,8 @@ include "beam-sdks-java-extensions-sql-shell"
project(":beam-sdks-java-extensions-sql-shell").dir =
file("sdks/java/extensions/sql/shell")
include "beam-sdks-java-extensions-sql-hcatalog"
project(":beam-sdks-java-extensions-sql-hcatalog").dir =
file("sdks/java/extensions/sql/hcatalog")
+include "beam-sdks-java-extensions-sql-datacatalog"
+project(":beam-sdks-java-extensions-sql-datacatalog").dir =
file("sdks/java/extensions/sql/datacatalog")
include "beam-sdks-java-fn-execution"
project(":beam-sdks-java-fn-execution").dir = file("sdks/java/fn-execution")
include "beam-sdks-java-harness"