This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a99664  [SQL] Add Data Catalog Table Provider
     new 9c8a8dc  Merge pull request #8349 from 
akedin/datacatalog-table-provider
9a99664 is described below

commit 9a99664822a8fd79bb5491e206159fce064c9882
Author: akedin <ke...@google.com>
AuthorDate: Wed Apr 17 22:55:01 2019 -0700

    [SQL] Add Data Catalog Table Provider
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   3 +
 sdks/java/extensions/sql/datacatalog/build.gradle  |  63 +++++++++++
 .../sql/example/BeamSqlDataCatalogExample.java     | 103 +++++++++++++++++
 .../sdk/extensions/sql/example/package-info.java   |  20 ++++
 .../meta/provider/datacatalog/BigQueryUtils.java   |  58 ++++++++++
 .../datacatalog/DataCatalogClientAdapter.java      |  94 ++++++++++++++++
 .../datacatalog/DataCatalogPipelineOptions.java    |  35 ++++++
 .../datacatalog/DataCatalogTableProvider.java      | 122 +++++++++++++++++++++
 .../sql/meta/provider/datacatalog/PubsubUtils.java |  53 +++++++++
 .../sql/meta/provider/datacatalog/SchemaUtils.java |  98 +++++++++++++++++
 .../sql/meta/provider/datacatalog/TableUtils.java  |  59 ++++++++++
 .../meta/provider/datacatalog/package-info.java    |  20 ++++
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java    |   4 +-
 settings.gradle                                    |   2 +
 14 files changed, 733 insertions(+), 1 deletion(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 74bc02f..6614954 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -350,6 +350,7 @@ class BeamModulePlugin implements Plugin<Project> {
     def cassandra_driver_version = "3.6.0"
     def generated_grpc_beta_version = "0.44.0"
     def generated_grpc_ga_version = "1.43.0"
+    def generated_grpc_dc_beta_version = "0.1.0-alpha"
     def google_auth_version = "0.12.0"
     def google_clients_version = "1.27.0"
     def google_cloud_bigdataoss_version = "1.9.16"
@@ -442,6 +443,7 @@ class BeamModulePlugin implements Plugin<Project> {
         grpc_all                                    : 
"io.grpc:grpc-all:$grpc_version",
         grpc_auth                                   : 
"io.grpc:grpc-auth:$grpc_version",
         grpc_core                                   : 
"io.grpc:grpc-core:$grpc_version",
+        grpc_google_cloud_datacatalog_v1beta1       : 
"com.google.api.grpc:grpc-google-cloud-datacatalog-v1beta1:$generated_grpc_dc_beta_version",
         grpc_google_cloud_pubsub_v1                 : 
"com.google.api.grpc:grpc-google-cloud-pubsub-v1:$generated_grpc_ga_version",
         grpc_protobuf                               : 
"io.grpc:grpc-protobuf:$grpc_version",
         grpc_protobuf_lite                          : 
"io.grpc:grpc-protobuf-lite:$grpc_version",
@@ -480,6 +482,7 @@ class BeamModulePlugin implements Plugin<Project> {
         powermock                                   : 
"org.powermock:powermock-mockito-release-full:1.6.4",
         protobuf_java                               : 
"com.google.protobuf:protobuf-java:$protobuf_version",
         protobuf_java_util                          : 
"com.google.protobuf:protobuf-java-util:$protobuf_version",
+        proto_google_cloud_datacatalog_v1beta1      : 
"com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1:$generated_grpc_dc_beta_version",
         proto_google_cloud_pubsub_v1                : 
"com.google.api.grpc:proto-google-cloud-pubsub-v1:$generated_grpc_ga_version",
         proto_google_cloud_spanner_admin_database_v1: 
"com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1:$google_cloud_spanner_version",
         proto_google_common_protos                  : 
"com.google.api.grpc:proto-google-common-protos:$proto_google_common_protos_version",
diff --git a/sdks/java/extensions/sql/datacatalog/build.gradle 
b/sdks/java/extensions/sql/datacatalog/build.gradle
new file mode 100644
index 0000000..9bf974a
--- /dev/null
+++ b/sdks/java/extensions/sql/datacatalog/build.gradle
@@ -0,0 +1,63 @@
+import groovy.json.JsonOutput
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+
+applyJavaNature(
+  testShadowJar: true
+)
+
+dependencies {
+  shadow library.java.grpc_google_cloud_datacatalog_v1beta1
+  shadow library.java.proto_google_cloud_datacatalog_v1beta1
+  provided project(path: ":beam-sdks-java-extensions-sql", configuration: 
"shadow")
+
+  // For Data Catalog GRPC client
+  provided library.java.grpc_all
+  provided library.java.google_auth_library_oauth2_http
+  provided library.java.grpc_netty
+  provided library.java.netty_tcnative_boringssl_static
+
+  // Dependencies for the example
+  provided project(path: ":beam-sdks-java-io-google-cloud-platform", 
configuration: "shadow")
+  provided library.java.vendored_guava_20_0
+  provided library.java.slf4j_api
+  provided library.java.slf4j_simple
+}
+
+task runDataCatalogExample(type: JavaExec) {
+  description = "Run SQL example of how to use Data Catalog table provider"
+  main = "org.apache.beam.sdk.extensions.sql.example.BeamSqlDataCatalogExample"
+  classpath = sourceSets.main.runtimeClasspath
+
+  def runner = project.findProperty('runner') ?: 'DirectRunner'
+  def queryString = project.findProperty('queryString') ?: ''
+  def outputFilePrefix = project.findProperty('outputFilePrefix') ?: ''
+  def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
+  def gcsTempRoot = project.findProperty('gcsTempRoot') ?: 
'gs://temp-storage-for-end-to-end-tests/'
+
+  args = [
+    "--runner=${runner}",
+    "--queryString=${queryString}",
+    "--outputFilePrefix=${outputFilePrefix}",
+    "--project=${gcpProject}",
+    "--tempLocation=${gcsTempRoot}",
+  ]
+}
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlDataCatalogExample.java
 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlDataCatalogExample.java
new file mode 100644
index 0000000..8793842
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlDataCatalogExample.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.example;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog.DataCatalogTableProvider;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Example pipeline that uses Google Cloud Data Catalog to retrieve the table 
metadata. */
+public class BeamSqlDataCatalogExample {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamSqlDataCatalogExample.class);
+
+  /** Pipeline options to specify the query and the output for the example. */
+  public interface DCExamplePipelineOptions extends PipelineOptions {
+
+    /** SQL Query. */
+    @Description("Required. SQL Query containing the pipeline logic.")
+    @Validation.Required
+    String getQueryString();
+
+    void setQueryString(String queryString);
+
+    /** Output file prefix. */
+    @Description("Required. Output file prefix.")
+    @Validation.Required
+    String getOutputFilePrefix();
+
+    void setOutputFilePrefix(String outputPathPrefix);
+  }
+
+  public static void main(String[] args) throws Exception {
+    LOG.info("Args: {}", Arrays.asList(args));
+    DCExamplePipelineOptions options =
+        
PipelineOptionsFactory.fromArgs(args).as(DCExamplePipelineOptions.class);
+    LOG.info("Query: {}\nOutput: {}", options.getQueryString(), 
options.getOutputFilePrefix());
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    validateArgs(options);
+
+    pipeline
+        .apply(
+            "SQL Query",
+            SqlTransform.query(options.getQueryString())
+                .withDefaultTableProvider("datacatalog", 
DataCatalogTableProvider.create(options)))
+        .apply("Convert to Strings", rowsToStrings())
+        .apply("Write output", 
TextIO.write().to(options.getOutputFilePrefix()));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static MapElements<Row, String> rowsToStrings() {
+    return MapElements.into(TypeDescriptor.of(String.class))
+        .via(
+            row -> 
row.getValues().stream().map(String::valueOf).collect(Collectors.joining(", 
")));
+  }
+
+  private static void validateArgs(DCExamplePipelineOptions options) {
+    if (Strings.isNullOrEmpty(options.getQueryString())
+        || Strings.isNullOrEmpty(options.getOutputFilePrefix())) {
+      String usage =
+          "ERROR: SQL query or output file is not specified."
+              + "To run this example:\n"
+              + "./gradlew "
+              + 
":beam-sdks-java-extensions-sql-datacatalog:runDataCatalogExample "
+              + "-PgcpProject=<project> "
+              + "-PgcsTempRoot=<GCS temp location> "
+              + "-PqueryString=<query> "
+              + "-PoutputFilePrefix=<output location> "
+              + "-PtempLocation=<temp GCS location for BQ export>\n\n";
+      throw new IllegalArgumentException(usage);
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
new file mode 100644
index 0000000..7d6ff31
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Example how to use Data Catalog table provider. */
+package org.apache.beam.sdk.extensions.sql.example;
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryUtils.java
 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryUtils.java
new file mode 100644
index 0000000..c199ed0
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.cloud.datacatalog.Entry;
+import java.net.URI;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+
+/** Utils to extract BQ-specific entry information. */
+class BigQueryUtils {
+
+  private static final Pattern BQ_PATH_PATTERN =
+      Pattern.compile(
+          
"/projects/(?<PROJECT>[^/]+)/datasets/(?<DATASET>[^/]+)/tables/(?<TABLE>[^/]+)");
+
+  static Table.Builder tableBuilder(Entry entry) {
+    return Table.builder()
+        .location(getLocation(entry))
+        .properties(new JSONObject())
+        .type("bigquery")
+        .comment("");
+  }
+
+  private static String getLocation(Entry entry) {
+    URI entryName = URI.create(entry.getLinkedResource());
+    String bqPath = entryName.getPath();
+
+    Matcher bqPathMatcher = BQ_PATH_PATTERN.matcher(bqPath);
+    if (!bqPathMatcher.matches()) {
+      throw new IllegalArgumentException(
+          "Unsupported format for BigQuery table path: '" + 
entry.getLinkedResource() + "'");
+    }
+
+    String project = bqPathMatcher.group("PROJECT");
+    String dataset = bqPathMatcher.group("DATASET");
+    String table = bqPathMatcher.group("TABLE");
+
+    return String.format("%s:%s.%s", project, dataset, table);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogClientAdapter.java
 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogClientAdapter.java
new file mode 100644
index 0000000..973c3a8
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogClientAdapter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.datacatalog.DataCatalogGrpc;
+import com.google.cloud.datacatalog.DataCatalogGrpc.DataCatalogBlockingStub;
+import com.google.cloud.datacatalog.Entry;
+import com.google.cloud.datacatalog.LookupEntryRequest;
+import io.grpc.CallCredentials;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.MethodDescriptor;
+import io.grpc.auth.MoreCallCredentials;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+
+/** Wraps DataCatalog GRPC client and exposes simplified APIS for Data Catalog 
Table Provider. */
+class DataCatalogClientAdapter {
+
+  private DataCatalogBlockingStub dcClient;
+
+  private DataCatalogClientAdapter(DataCatalogBlockingStub dcClient) {
+    this.dcClient = dcClient;
+  }
+
+  /** Prod endpoint (default set in pipeline options): 
datacatalog.googleapis.com. */
+  public static DataCatalogClientAdapter withDefaultCredentials(String 
endpoint)
+      throws IOException {
+    return new DataCatalogClientAdapter(newClient(endpoint));
+  }
+
+  private static DataCatalogBlockingStub newClient(String endpoint) throws 
IOException {
+    Channel authedChannel =
+        ClientInterceptors.intercept(
+            ManagedChannelBuilder.forTarget(endpoint).build(),
+            CredentialsInterceptor.defaultCredentials());
+    return DataCatalogGrpc.newBlockingStub(authedChannel);
+  }
+
+  public @Nullable Table getTable(String tableName) {
+    Entry entry = dcClient.lookupEntry(sqlResource(tableName));
+    return TableUtils.toBeamTable(tableName, entry);
+  }
+
+  private LookupEntryRequest sqlResource(String tableName) {
+    return LookupEntryRequest.newBuilder().setSqlResource(tableName).build();
+  }
+
+  /** Provides default credentials. */
+  private static final class CredentialsInterceptor implements 
ClientInterceptor {
+
+    private CallCredentials callCredentials;
+
+    private CredentialsInterceptor(CallCredentials callCredentials) {
+      this.callCredentials = callCredentials;
+    }
+
+    public static CredentialsInterceptor defaultCredentials() throws 
IOException {
+      GoogleCredentials defaultCredentials = 
GoogleCredentials.getApplicationDefault();
+      return of(MoreCallCredentials.from(defaultCredentials));
+    }
+
+    public static CredentialsInterceptor of(CallCredentials credentials) {
+      return new CredentialsInterceptor(credentials);
+    }
+
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel 
next) {
+      return next.newCall(method, 
callOptions.withCallCredentials(callCredentials));
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogPipelineOptions.java
 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogPipelineOptions.java
new file mode 100644
index 0000000..47fffcf
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogPipelineOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.Validation;
+
+/** Pipeline options for Data Catalog table provider. */
+public interface DataCatalogPipelineOptions extends PipelineOptions {
+
+  /** DataCatalog endpoint. */
+  @Description("Data catalog endpoint.")
+  @Validation.Required
+  @Default.String("datacatalog.googleapis.com")
+  String getDataCatalogEndpoint();
+
+  void setDataCatalogEndpoint(String dataCatalogEndpoint);
+}
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
new file mode 100644
index 0000000..44c6a78
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import static java.util.stream.Collectors.toMap;
+
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
+import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+
+/** Uses DataCatalog to get the source type and schema for a table. */
+public class DataCatalogTableProvider implements TableProvider {
+
+  private Map<String, TableProvider> delegateProviders;
+  private Map<String, Table> tableCache;
+  private DataCatalogClientAdapter dataCatalog;
+
+  private DataCatalogTableProvider(
+      Map<String, TableProvider> delegateProviders, DataCatalogClientAdapter 
dataCatalogClient) {
+
+    this.tableCache = new HashMap<>();
+    this.delegateProviders = ImmutableMap.copyOf(delegateProviders);
+    this.dataCatalog = dataCatalogClient;
+  }
+
+  public static DataCatalogTableProvider create(PipelineOptions 
pipelineOptions)
+      throws IOException {
+
+    DataCatalogPipelineOptions options = 
pipelineOptions.as(DataCatalogPipelineOptions.class);
+
+    return new DataCatalogTableProvider(
+        getSupportedProviders(), 
getDataCatalogClient(options.getDataCatalogEndpoint()));
+  }
+
+  private static Map<String, TableProvider> getSupportedProviders() {
+    return Stream.of(
+            new PubsubJsonTableProvider(), new BigQueryTableProvider(), new 
TextTableProvider())
+        .collect(toMap(TableProvider::getTableType, p -> p));
+  }
+
+  private static DataCatalogClientAdapter getDataCatalogClient(String 
endpoint) throws IOException {
+    return DataCatalogClientAdapter.withDefaultCredentials(endpoint);
+  }
+
+  @Override
+  public String getTableType() {
+    return "google.cloud.datacatalog";
+  }
+
+  @Override
+  public void createTable(Table table) {
+    throw new UnsupportedOperationException(
+        "Creating tables is not supported with DataCatalog table provider.");
+  }
+
+  @Override
+  public void dropTable(String tableName) {
+    throw new UnsupportedOperationException(
+        "Dropping tables is not supported with DataCatalog table provider");
+  }
+
+  @Override
+  public Map<String, Table> getTables() {
+    throw new UnsupportedOperationException("Loading all tables from 
DataCatalog is not supported");
+  }
+
+  @Override
+  public @Nullable Table getTable(String tableName) {
+    return loadTable(tableName);
+  }
+
+  private @Nullable Table loadTable(String tableName) {
+    if (!tableCache.containsKey(tableName)) {
+      tableCache.put(tableName, loadTableFromDC(tableName));
+    }
+
+    return tableCache.get(tableName);
+  }
+
+  private Table loadTableFromDC(String tableName) {
+    try {
+      return dataCatalog.getTable(tableName);
+    } catch (StatusRuntimeException e) {
+      if (e.getStatus().equals(Status.INVALID_ARGUMENT)) {
+        return null;
+      }
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table table) {
+    return delegateProviders.get(table.getType()).buildBeamSqlTable(table);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubUtils.java
 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubUtils.java
new file mode 100644
index 0000000..856eec9
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.cloud.datacatalog.Entry;
+import java.net.URI;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+
+/** Utils to extract Pubsub-specific entry information. */
+class PubsubUtils {
+
+  private static final Pattern PS_PATH_PATTERN =
+      Pattern.compile("/projects/(?<PROJECT>[^/]+)/topics/(?<TOPIC>[^/]+)");
+
+  static Table.Builder tableBuilder(Entry entry) {
+    return Table.builder()
+        .location(getLocation(entry))
+        .properties(new JSONObject())
+        .type("pubsub")
+        .comment("");
+  }
+
+  private static String getLocation(Entry entry) {
+    URI entryName = URI.create(entry.getLinkedResource());
+    String psPath = entryName.getPath();
+
+    Matcher bqPathMatcher = PS_PATH_PATTERN.matcher(psPath);
+    if (!bqPathMatcher.matches()) {
+      throw new IllegalArgumentException(
+          "Unsupported format for Pubsub topic: '" + entry.getLinkedResource() 
+ "'");
+    }
+
+    return psPath.substring(1);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
new file mode 100644
index 0000000..c28c820
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
+
+import com.google.cloud.datacatalog.ColumnSchema;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+
+class SchemaUtils {
+
+  private static final Map<String, FieldType> FIELD_TYPES =
+      ImmutableMap.<String, FieldType>builder()
+          .put("TYPE_BOOL", FieldType.BOOLEAN)
+          .put("TYPE_BYTES", FieldType.BYTES)
+          .put("TYPE_DATE", FieldType.logicalType(new CalciteUtils.DateType()))
+          .put("TYPE_DATETIME", FieldType.DATETIME)
+          .put("TYPE_DOUBLE", FieldType.DOUBLE)
+          .put("TYPE_FLOAT", FieldType.DOUBLE)
+          .put("TYPE_INT32", FieldType.INT32)
+          .put("TYPE_INT64", FieldType.INT64)
+          .put("TYPE_STRING", FieldType.STRING)
+          .put("TYPE_TIME", FieldType.logicalType(new CalciteUtils.TimeType()))
+          .put("TYPE_TIMESTAMP", FieldType.DATETIME)
+          .put(
+              "TYPE_MAP<TYPE_STRING, TYPE_STRING>",
+              FieldType.map(FieldType.STRING, FieldType.STRING))
+          .build();
+
+  /** Convert DataCatalog schema to Beam schema. */
+  static Schema fromDataCatalog(com.google.cloud.datacatalog.Schema dcSchema) {
+    return fromColumnsList(dcSchema.getColumnsList());
+  }
+
+  private static Schema fromColumnsList(List<ColumnSchema> columnsMap) {
+    return 
columnsMap.stream().map(SchemaUtils::toBeamField).collect(toSchema());
+  }
+
+  private static Field toBeamField(ColumnSchema column) {
+    String name = column.getColumn();
+
+    // basic field type
+    FieldType fieldType = getBeamFieldType(column);
+    Field field = Field.of(name, fieldType);
+
+    // set the nullable flag, or convert to a list if repeated
+    if (Strings.isNullOrEmpty(column.getMode()) || 
"NULLABLE".equals(column.getMode())) {
+      field = field.withNullable(true);
+    } else if ("REQUIRED".equals(column.getMode())) {
+      field = field.withNullable(false);
+    } else if ("REPEATED".equals(column.getMode())) {
+      field = Field.of(name, FieldType.array(fieldType));
+    } else {
+      throw new UnsupportedOperationException(
+          "Field mode '" + column.getMode() + "' is not supported (field '" + 
name + "')");
+    }
+
+    return field;
+  }
+
+  private static FieldType getBeamFieldType(ColumnSchema column) {
+    String dcFieldType = column.getType();
+
+    if (FIELD_TYPES.containsKey(dcFieldType)) {
+      return FIELD_TYPES.get(dcFieldType);
+    }
+
+    if ("TYPE_STRUCT".equals(dcFieldType)) {
+      Schema structSchema = fromColumnsList(column.getSubcolumnsList());
+      return FieldType.row(structSchema);
+    }
+
+    throw new UnsupportedOperationException(
+        "Field type '" + dcFieldType + "' is not supported (field '" + 
column.getColumn() + "')");
+  }
+}
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java
 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java
new file mode 100644
index 0000000..ebf3b99
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
+
+import com.google.cloud.datacatalog.Entry;
+import java.net.URI;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+
+/** Common utilities to create Beam SQL tables from Data Catalog schemas. */
+class TableUtils {
+
+  interface TableFactory {
+    Table.Builder tableBuilder(Entry entry);
+  }
+
+  private static final Map<String, TableFactory> TABLE_FACTORIES =
+      ImmutableMap.<String, TableFactory>builder()
+          .put("bigquery.googleapis.com", BigQueryUtils::tableBuilder)
+          .put("pubsub.googleapis.com", PubsubUtils::tableBuilder)
+          .build();
+
+  static Table toBeamTable(String tableName, Entry entry) {
+    if (entry.getSchema().getColumnsCount() == 0) {
+      throw new UnsupportedOperationException(
+          "Entry doesn't have a schema. Please attach a schema to '"
+              + tableName
+              + "' in Data Catalog: "
+              + entry.toString());
+    }
+
+    String service = 
URI.create(entry.getLinkedResource()).getAuthority().toLowerCase();
+
+    if (!TABLE_FACTORIES.containsKey(service)) {
+      throw new UnsupportedOperationException(
+          "Unsupported SQL source kind: " + entry.getLinkedResource());
+    }
+
+    Schema schema = SchemaUtils.fromDataCatalog(entry.getSchema());
+    return 
TABLE_FACTORIES.get(service).tableBuilder(entry).schema(schema).name(tableName).build();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/package-info.java
 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/package-info.java
new file mode 100644
index 0000000..029f3f8
--- /dev/null
+++ 
b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Table schema for Google Cloud Data Catalog. */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index f63d8ac..2c1e138 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -430,7 +430,9 @@ public class BigQueryUtils {
   }
 
   private static Object convertAvroString(Object value) {
-    if (value instanceof org.apache.avro.util.Utf8) {
+    if (value == null) {
+      return null;
+    } else if (value instanceof org.apache.avro.util.Utf8) {
       return ((org.apache.avro.util.Utf8) value).toString();
     } else if (value instanceof String) {
       return value;
diff --git a/settings.gradle b/settings.gradle
index 65e67fa..5d2dfc8 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -125,6 +125,8 @@ include "beam-sdks-java-extensions-sql-shell"
 project(":beam-sdks-java-extensions-sql-shell").dir = 
file("sdks/java/extensions/sql/shell")
 include "beam-sdks-java-extensions-sql-hcatalog"
 project(":beam-sdks-java-extensions-sql-hcatalog").dir = 
file("sdks/java/extensions/sql/hcatalog")
+include "beam-sdks-java-extensions-sql-datacatalog"
+project(":beam-sdks-java-extensions-sql-datacatalog").dir = 
file("sdks/java/extensions/sql/datacatalog")
 include "beam-sdks-java-fn-execution"
 project(":beam-sdks-java-fn-execution").dir = file("sdks/java/fn-execution")
 include "beam-sdks-java-harness"

Reply via email to