This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 9a99664 [SQL] Add Data Catalog Table Provider new 9c8a8dc Merge pull request #8349 from akedin/datacatalog-table-provider 9a99664 is described below commit 9a99664822a8fd79bb5491e206159fce064c9882 Author: akedin <ke...@google.com> AuthorDate: Wed Apr 17 22:55:01 2019 -0700 [SQL] Add Data Catalog Table Provider --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 3 + sdks/java/extensions/sql/datacatalog/build.gradle | 63 +++++++++++ .../sql/example/BeamSqlDataCatalogExample.java | 103 +++++++++++++++++ .../sdk/extensions/sql/example/package-info.java | 20 ++++ .../meta/provider/datacatalog/BigQueryUtils.java | 58 ++++++++++ .../datacatalog/DataCatalogClientAdapter.java | 94 ++++++++++++++++ .../datacatalog/DataCatalogPipelineOptions.java | 35 ++++++ .../datacatalog/DataCatalogTableProvider.java | 122 +++++++++++++++++++++ .../sql/meta/provider/datacatalog/PubsubUtils.java | 53 +++++++++ .../sql/meta/provider/datacatalog/SchemaUtils.java | 98 +++++++++++++++++ .../sql/meta/provider/datacatalog/TableUtils.java | 59 ++++++++++ .../meta/provider/datacatalog/package-info.java | 20 ++++ .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 4 +- settings.gradle | 2 + 14 files changed, 733 insertions(+), 1 deletion(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 74bc02f..6614954 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -350,6 +350,7 @@ class BeamModulePlugin implements Plugin<Project> { def cassandra_driver_version = "3.6.0" def generated_grpc_beta_version = "0.44.0" def generated_grpc_ga_version = "1.43.0" + def generated_grpc_dc_beta_version = "0.1.0-alpha" def google_auth_version = "0.12.0" def google_clients_version = "1.27.0" def google_cloud_bigdataoss_version = "1.9.16" @@ -442,6 +443,7 @@ class BeamModulePlugin implements Plugin<Project> { grpc_all : "io.grpc:grpc-all:$grpc_version", grpc_auth : "io.grpc:grpc-auth:$grpc_version", grpc_core : "io.grpc:grpc-core:$grpc_version", + grpc_google_cloud_datacatalog_v1beta1 : "com.google.api.grpc:grpc-google-cloud-datacatalog-v1beta1:$generated_grpc_dc_beta_version", grpc_google_cloud_pubsub_v1 : "com.google.api.grpc:grpc-google-cloud-pubsub-v1:$generated_grpc_ga_version", grpc_protobuf : "io.grpc:grpc-protobuf:$grpc_version", grpc_protobuf_lite : "io.grpc:grpc-protobuf-lite:$grpc_version", @@ -480,6 +482,7 @@ class BeamModulePlugin implements Plugin<Project> { powermock : "org.powermock:powermock-mockito-release-full:1.6.4", protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version", protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version", + proto_google_cloud_datacatalog_v1beta1 : "com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1:$generated_grpc_dc_beta_version", proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1:$generated_grpc_ga_version", proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1:$google_cloud_spanner_version", proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos:$proto_google_common_protos_version", diff --git a/sdks/java/extensions/sql/datacatalog/build.gradle b/sdks/java/extensions/sql/datacatalog/build.gradle new file mode 100644 index 0000000..9bf974a --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/build.gradle @@ -0,0 +1,63 @@ +import groovy.json.JsonOutput + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } + +applyJavaNature( + testShadowJar: true +) + +dependencies { + shadow library.java.grpc_google_cloud_datacatalog_v1beta1 + shadow library.java.proto_google_cloud_datacatalog_v1beta1 + provided project(path: ":beam-sdks-java-extensions-sql", configuration: "shadow") + + // For Data Catalog GRPC client + provided library.java.grpc_all + provided library.java.google_auth_library_oauth2_http + provided library.java.grpc_netty + provided library.java.netty_tcnative_boringssl_static + + // Dependencies for the example + provided project(path: ":beam-sdks-java-io-google-cloud-platform", configuration: "shadow") + provided library.java.vendored_guava_20_0 + provided library.java.slf4j_api + provided library.java.slf4j_simple +} + +task runDataCatalogExample(type: JavaExec) { + description = "Run SQL example of how to use Data Catalog table provider" + main = "org.apache.beam.sdk.extensions.sql.example.BeamSqlDataCatalogExample" + classpath = sourceSets.main.runtimeClasspath + + def runner = project.findProperty('runner') ?: 'DirectRunner' + def queryString = project.findProperty('queryString') ?: '' + def outputFilePrefix = project.findProperty('outputFilePrefix') ?: '' + def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' + def gcsTempRoot = project.findProperty('gcsTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests/' + + args = [ + "--runner=${runner}", + "--queryString=${queryString}", + "--outputFilePrefix=${outputFilePrefix}", + "--project=${gcpProject}", + "--tempLocation=${gcsTempRoot}", + ] +} diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlDataCatalogExample.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlDataCatalogExample.java new file mode 100644 index 0000000..8793842 --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlDataCatalogExample.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.example; + +import java.util.Arrays; +import java.util.stream.Collectors; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.SqlTransform; +import org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog.DataCatalogTableProvider; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Example pipeline that uses Google Cloud Data Catalog to retrieve the table metadata. */ +public class BeamSqlDataCatalogExample { + + private static final Logger LOG = LoggerFactory.getLogger(BeamSqlDataCatalogExample.class); + + /** Pipeline options to specify the query and the output for the example. */ + public interface DCExamplePipelineOptions extends PipelineOptions { + + /** SQL Query. */ + @Description("Required. SQL Query containing the pipeline logic.") + @Validation.Required + String getQueryString(); + + void setQueryString(String queryString); + + /** Output file prefix. */ + @Description("Required. Output file prefix.") + @Validation.Required + String getOutputFilePrefix(); + + void setOutputFilePrefix(String outputPathPrefix); + } + + public static void main(String[] args) throws Exception { + LOG.info("Args: {}", Arrays.asList(args)); + DCExamplePipelineOptions options = + PipelineOptionsFactory.fromArgs(args).as(DCExamplePipelineOptions.class); + LOG.info("Query: {}\nOutput: {}", options.getQueryString(), options.getOutputFilePrefix()); + + Pipeline pipeline = Pipeline.create(options); + + validateArgs(options); + + pipeline + .apply( + "SQL Query", + SqlTransform.query(options.getQueryString()) + .withDefaultTableProvider("datacatalog", DataCatalogTableProvider.create(options))) + .apply("Convert to Strings", rowsToStrings()) + .apply("Write output", TextIO.write().to(options.getOutputFilePrefix())); + + pipeline.run().waitUntilFinish(); + } + + private static MapElements<Row, String> rowsToStrings() { + return MapElements.into(TypeDescriptor.of(String.class)) + .via( + row -> row.getValues().stream().map(String::valueOf).collect(Collectors.joining(", "))); + } + + private static void validateArgs(DCExamplePipelineOptions options) { + if (Strings.isNullOrEmpty(options.getQueryString()) + || Strings.isNullOrEmpty(options.getOutputFilePrefix())) { + String usage = + "ERROR: SQL query or output file is not specified." + + "To run this example:\n" + + "./gradlew " + + ":beam-sdks-java-extensions-sql-datacatalog:runDataCatalogExample " + + "-PgcpProject=<project> " + + "-PgcsTempRoot=<GCS temp location> " + + "-PqueryString=<query> " + + "-PoutputFilePrefix=<output location> " + + "-PtempLocation=<temp GCS location for BQ export>\n\n"; + throw new IllegalArgumentException(usage); + } + } +} diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java new file mode 100644 index 0000000..7d6ff31 --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Example how to use Data Catalog table provider. */ +package org.apache.beam.sdk.extensions.sql.example; diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryUtils.java new file mode 100644 index 0000000..c199ed0 --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/BigQueryUtils.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; + +import com.alibaba.fastjson.JSONObject; +import com.google.cloud.datacatalog.Entry; +import java.net.URI; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.extensions.sql.meta.Table; + +/** Utils to extract BQ-specific entry information. */ +class BigQueryUtils { + + private static final Pattern BQ_PATH_PATTERN = + Pattern.compile( + "/projects/(?<PROJECT>[^/]+)/datasets/(?<DATASET>[^/]+)/tables/(?<TABLE>[^/]+)"); + + static Table.Builder tableBuilder(Entry entry) { + return Table.builder() + .location(getLocation(entry)) + .properties(new JSONObject()) + .type("bigquery") + .comment(""); + } + + private static String getLocation(Entry entry) { + URI entryName = URI.create(entry.getLinkedResource()); + String bqPath = entryName.getPath(); + + Matcher bqPathMatcher = BQ_PATH_PATTERN.matcher(bqPath); + if (!bqPathMatcher.matches()) { + throw new IllegalArgumentException( + "Unsupported format for BigQuery table path: '" + entry.getLinkedResource() + "'"); + } + + String project = bqPathMatcher.group("PROJECT"); + String dataset = bqPathMatcher.group("DATASET"); + String table = bqPathMatcher.group("TABLE"); + + return String.format("%s:%s.%s", project, dataset, table); + } +} diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogClientAdapter.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogClientAdapter.java new file mode 100644 index 0000000..973c3a8 --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogClientAdapter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.datacatalog.DataCatalogGrpc; +import com.google.cloud.datacatalog.DataCatalogGrpc.DataCatalogBlockingStub; +import com.google.cloud.datacatalog.Entry; +import com.google.cloud.datacatalog.LookupEntryRequest; +import io.grpc.CallCredentials; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannelBuilder; +import io.grpc.MethodDescriptor; +import io.grpc.auth.MoreCallCredentials; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.sql.meta.Table; + +/** Wraps DataCatalog GRPC client and exposes simplified APIS for Data Catalog Table Provider. */ +class DataCatalogClientAdapter { + + private DataCatalogBlockingStub dcClient; + + private DataCatalogClientAdapter(DataCatalogBlockingStub dcClient) { + this.dcClient = dcClient; + } + + /** Prod endpoint (default set in pipeline options): datacatalog.googleapis.com. */ + public static DataCatalogClientAdapter withDefaultCredentials(String endpoint) + throws IOException { + return new DataCatalogClientAdapter(newClient(endpoint)); + } + + private static DataCatalogBlockingStub newClient(String endpoint) throws IOException { + Channel authedChannel = + ClientInterceptors.intercept( + ManagedChannelBuilder.forTarget(endpoint).build(), + CredentialsInterceptor.defaultCredentials()); + return DataCatalogGrpc.newBlockingStub(authedChannel); + } + + public @Nullable Table getTable(String tableName) { + Entry entry = dcClient.lookupEntry(sqlResource(tableName)); + return TableUtils.toBeamTable(tableName, entry); + } + + private LookupEntryRequest sqlResource(String tableName) { + return LookupEntryRequest.newBuilder().setSqlResource(tableName).build(); + } + + /** Provides default credentials. */ + private static final class CredentialsInterceptor implements ClientInterceptor { + + private CallCredentials callCredentials; + + private CredentialsInterceptor(CallCredentials callCredentials) { + this.callCredentials = callCredentials; + } + + public static CredentialsInterceptor defaultCredentials() throws IOException { + GoogleCredentials defaultCredentials = GoogleCredentials.getApplicationDefault(); + return of(MoreCallCredentials.from(defaultCredentials)); + } + + public static CredentialsInterceptor of(CallCredentials credentials) { + return new CredentialsInterceptor(credentials); + } + + @Override + public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( + MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { + return next.newCall(method, callOptions.withCallCredentials(callCredentials)); + } + } +} diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogPipelineOptions.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogPipelineOptions.java new file mode 100644 index 0000000..47fffcf --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogPipelineOptions.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.Validation; + +/** Pipeline options for Data Catalog table provider. */ +public interface DataCatalogPipelineOptions extends PipelineOptions { + + /** DataCatalog endpoint. */ + @Description("Data catalog endpoint.") + @Validation.Required + @Default.String("datacatalog.googleapis.com") + String getDataCatalogEndpoint(); + + void setDataCatalogEndpoint(String dataCatalogEndpoint); +} diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java new file mode 100644 index 0000000..44c6a78 --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/DataCatalogTableProvider.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; + +import static java.util.stream.Collectors.toMap; + +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; +import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider; +import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider; +import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; + +/** Uses DataCatalog to get the source type and schema for a table. */ +public class DataCatalogTableProvider implements TableProvider { + + private Map<String, TableProvider> delegateProviders; + private Map<String, Table> tableCache; + private DataCatalogClientAdapter dataCatalog; + + private DataCatalogTableProvider( + Map<String, TableProvider> delegateProviders, DataCatalogClientAdapter dataCatalogClient) { + + this.tableCache = new HashMap<>(); + this.delegateProviders = ImmutableMap.copyOf(delegateProviders); + this.dataCatalog = dataCatalogClient; + } + + public static DataCatalogTableProvider create(PipelineOptions pipelineOptions) + throws IOException { + + DataCatalogPipelineOptions options = pipelineOptions.as(DataCatalogPipelineOptions.class); + + return new DataCatalogTableProvider( + getSupportedProviders(), getDataCatalogClient(options.getDataCatalogEndpoint())); + } + + private static Map<String, TableProvider> getSupportedProviders() { + return Stream.of( + new PubsubJsonTableProvider(), new BigQueryTableProvider(), new TextTableProvider()) + .collect(toMap(TableProvider::getTableType, p -> p)); + } + + private static DataCatalogClientAdapter getDataCatalogClient(String endpoint) throws IOException { + return DataCatalogClientAdapter.withDefaultCredentials(endpoint); + } + + @Override + public String getTableType() { + return "google.cloud.datacatalog"; + } + + @Override + public void createTable(Table table) { + throw new UnsupportedOperationException( + "Creating tables is not supported with DataCatalog table provider."); + } + + @Override + public void dropTable(String tableName) { + throw new UnsupportedOperationException( + "Dropping tables is not supported with DataCatalog table provider"); + } + + @Override + public Map<String, Table> getTables() { + throw new UnsupportedOperationException("Loading all tables from DataCatalog is not supported"); + } + + @Override + public @Nullable Table getTable(String tableName) { + return loadTable(tableName); + } + + private @Nullable Table loadTable(String tableName) { + if (!tableCache.containsKey(tableName)) { + tableCache.put(tableName, loadTableFromDC(tableName)); + } + + return tableCache.get(tableName); + } + + private Table loadTableFromDC(String tableName) { + try { + return dataCatalog.getTable(tableName); + } catch (StatusRuntimeException e) { + if (e.getStatus().equals(Status.INVALID_ARGUMENT)) { + return null; + } + throw new RuntimeException(e); + } + } + + @Override + public BeamSqlTable buildBeamSqlTable(Table table) { + return delegateProviders.get(table.getType()).buildBeamSqlTable(table); + } +} diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubUtils.java new file mode 100644 index 0000000..856eec9 --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/PubsubUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; + +import com.alibaba.fastjson.JSONObject; +import com.google.cloud.datacatalog.Entry; +import java.net.URI; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.extensions.sql.meta.Table; + +/** Utils to extract Pubsub-specific entry information. */ +class PubsubUtils { + + private static final Pattern PS_PATH_PATTERN = + Pattern.compile("/projects/(?<PROJECT>[^/]+)/topics/(?<TOPIC>[^/]+)"); + + static Table.Builder tableBuilder(Entry entry) { + return Table.builder() + .location(getLocation(entry)) + .properties(new JSONObject()) + .type("pubsub") + .comment(""); + } + + private static String getLocation(Entry entry) { + URI entryName = URI.create(entry.getLinkedResource()); + String psPath = entryName.getPath(); + + Matcher bqPathMatcher = PS_PATH_PATTERN.matcher(psPath); + if (!bqPathMatcher.matches()) { + throw new IllegalArgumentException( + "Unsupported format for Pubsub topic: '" + entry.getLinkedResource() + "'"); + } + + return psPath.substring(1); + } +} diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java new file mode 100644 index 0000000..c28c820 --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; + +import static org.apache.beam.sdk.schemas.Schema.toSchema; + +import com.google.cloud.datacatalog.ColumnSchema; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; + +class SchemaUtils { + + private static final Map<String, FieldType> FIELD_TYPES = + ImmutableMap.<String, FieldType>builder() + .put("TYPE_BOOL", FieldType.BOOLEAN) + .put("TYPE_BYTES", FieldType.BYTES) + .put("TYPE_DATE", FieldType.logicalType(new CalciteUtils.DateType())) + .put("TYPE_DATETIME", FieldType.DATETIME) + .put("TYPE_DOUBLE", FieldType.DOUBLE) + .put("TYPE_FLOAT", FieldType.DOUBLE) + .put("TYPE_INT32", FieldType.INT32) + .put("TYPE_INT64", FieldType.INT64) + .put("TYPE_STRING", FieldType.STRING) + .put("TYPE_TIME", FieldType.logicalType(new CalciteUtils.TimeType())) + .put("TYPE_TIMESTAMP", FieldType.DATETIME) + .put( + "TYPE_MAP<TYPE_STRING, TYPE_STRING>", + FieldType.map(FieldType.STRING, FieldType.STRING)) + .build(); + + /** Convert DataCatalog schema to Beam schema. */ + static Schema fromDataCatalog(com.google.cloud.datacatalog.Schema dcSchema) { + return fromColumnsList(dcSchema.getColumnsList()); + } + + private static Schema fromColumnsList(List<ColumnSchema> columnsMap) { + return columnsMap.stream().map(SchemaUtils::toBeamField).collect(toSchema()); + } + + private static Field toBeamField(ColumnSchema column) { + String name = column.getColumn(); + + // basic field type + FieldType fieldType = getBeamFieldType(column); + Field field = Field.of(name, fieldType); + + // set the nullable flag, or convert to a list if repeated + if (Strings.isNullOrEmpty(column.getMode()) || "NULLABLE".equals(column.getMode())) { + field = field.withNullable(true); + } else if ("REQUIRED".equals(column.getMode())) { + field = field.withNullable(false); + } else if ("REPEATED".equals(column.getMode())) { + field = Field.of(name, FieldType.array(fieldType)); + } else { + throw new UnsupportedOperationException( + "Field mode '" + column.getMode() + "' is not supported (field '" + name + "')"); + } + + return field; + } + + private static FieldType getBeamFieldType(ColumnSchema column) { + String dcFieldType = column.getType(); + + if (FIELD_TYPES.containsKey(dcFieldType)) { + return FIELD_TYPES.get(dcFieldType); + } + + if ("TYPE_STRUCT".equals(dcFieldType)) { + Schema structSchema = fromColumnsList(column.getSubcolumnsList()); + return FieldType.row(structSchema); + } + + throw new UnsupportedOperationException( + "Field type '" + dcFieldType + "' is not supported (field '" + column.getColumn() + "')"); + } +} diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java new file mode 100644 index 0000000..ebf3b99 --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/TableUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; + +import com.google.cloud.datacatalog.Entry; +import java.net.URI; +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; + +/** Common utilities to create Beam SQL tables from Data Catalog schemas. */ +class TableUtils { + + interface TableFactory { + Table.Builder tableBuilder(Entry entry); + } + + private static final Map<String, TableFactory> TABLE_FACTORIES = + ImmutableMap.<String, TableFactory>builder() + .put("bigquery.googleapis.com", BigQueryUtils::tableBuilder) + .put("pubsub.googleapis.com", PubsubUtils::tableBuilder) + .build(); + + static Table toBeamTable(String tableName, Entry entry) { + if (entry.getSchema().getColumnsCount() == 0) { + throw new UnsupportedOperationException( + "Entry doesn't have a schema. Please attach a schema to '" + + tableName + + "' in Data Catalog: " + + entry.toString()); + } + + String service = URI.create(entry.getLinkedResource()).getAuthority().toLowerCase(); + + if (!TABLE_FACTORIES.containsKey(service)) { + throw new UnsupportedOperationException( + "Unsupported SQL source kind: " + entry.getLinkedResource()); + } + + Schema schema = SchemaUtils.fromDataCatalog(entry.getSchema()); + return TABLE_FACTORIES.get(service).tableBuilder(entry).schema(schema).name(tableName).build(); + } +} diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/package-info.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/package-info.java new file mode 100644 index 0000000..029f3f8 --- /dev/null +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Table schema for Google Cloud Data Catalog. */ +package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index f63d8ac..2c1e138 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -430,7 +430,9 @@ public class BigQueryUtils { } private static Object convertAvroString(Object value) { - if (value instanceof org.apache.avro.util.Utf8) { + if (value == null) { + return null; + } else if (value instanceof org.apache.avro.util.Utf8) { return ((org.apache.avro.util.Utf8) value).toString(); } else if (value instanceof String) { return value; diff --git a/settings.gradle b/settings.gradle index 65e67fa..5d2dfc8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -125,6 +125,8 @@ include "beam-sdks-java-extensions-sql-shell" project(":beam-sdks-java-extensions-sql-shell").dir = file("sdks/java/extensions/sql/shell") include "beam-sdks-java-extensions-sql-hcatalog" project(":beam-sdks-java-extensions-sql-hcatalog").dir = file("sdks/java/extensions/sql/hcatalog") +include "beam-sdks-java-extensions-sql-datacatalog" +project(":beam-sdks-java-extensions-sql-datacatalog").dir = file("sdks/java/extensions/sql/datacatalog") include "beam-sdks-java-fn-execution" project(":beam-sdks-java-fn-execution").dir = file("sdks/java/fn-execution") include "beam-sdks-java-harness"