chamikaramj commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1137821416
##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1514,6 +1531,42 @@ def process(self, element, *side_inputs):
yield (self.destination(element, *side_inputs), element)
+def beam_row_from_dict(row: dict, schema):
Review Comment:
Let's make sure this util is extensively covered by unit testing for various
types etc.
##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -103,6 +107,19 @@
_DATASET_PATTERN = r'\w{1,1024}'
_TABLE_PATTERN = r'[\p{L}\p{M}\p{N}\p{Pc}\p{Pd}\p{Zs}$]{1,1024}'
+BIGQUERY_TYPE_TO_PYTHON_TYPE = {
+ "STRING": str,
+ "BOOL": bool,
+ "BOOLEAN": bool,
+ "BYTES": bytes,
+ "INT64": np.int64,
+ "INTEGER": np.int64,
+ "FLOAT64": np.float64,
+ "FLOAT": np.float64,
+ "NUMERIC": decimal.Decimal,
+ "TIMESTAMP": apache_beam.utils.timestamp.Timestamp,
Review Comment:
Seems like this doesn't cover all BQ types ? For example, Geography types
[1]. We should add least add a TODO to fix this.
[1]
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#geography_type
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
Review Comment:
Let's also hide this transform from the public API by renaming to
"_StorageWriteToBigQuery".
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2307,6 +2397,94 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+ """Writes data to BigQuery using Storage API.
+
+ Experimental; no backwards compatibility guarantees.
+ """
+ URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+ FAILED_ROWS = "FailedRows"
+ FAILED_ROWS_WITH_ERRORS = "FailedRowsWithErrors"
+
+ def __init__(
+ self,
+ table,
+ create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=BigQueryDisposition.WRITE_APPEND,
+ triggering_frequency=0,
+ use_at_least_once=False,
+ expansion_service=None):
+ """Initialize a StorageWriteToBigQuery transform.
+
+ :param table:
+ Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+ :param create_disposition:
+ String specifying the strategy to take when the table doesn't
+ exist. Possible values are:
+ * ``'CREATE_IF_NEEDED'``: create if does not exist.
+ * ``'CREATE_NEVER'``: fail the write if does not exist.
+ :param write_disposition:
+ String specifying the strategy to take when the table already
+ contains data. Possible values are:
+ * ``'WRITE_TRUNCATE'``: delete existing rows.
+ * ``'WRITE_APPEND'``: add to existing rows.
+ * ``'WRITE_EMPTY'``: fail the write if table not empty.
+ :param triggering_frequency:
+ The time in seconds between write commits. Should only be specified
+ for streaming pipelines. Defaults to 5 seconds.
+ :param use_at_least_once:
+ Use at-least-once semantics. Is cheaper and provides lower latency,
+ but will potentially duplicate records.
+ :param expansion_service:
+ The address (host:port) of the expansion service. If no expansion
+ service is provided, will attempt to run the default GCP expansion
+ service.
+ """
+ super().__init__()
+ self._table = table
+ self._create_disposition = create_disposition
+ self._write_disposition = write_disposition
+ self._triggering_frequency = triggering_frequency
+ self._use_at_least_once = use_at_least_once
+ self._expansion_service = (
+ expansion_service or _default_io_expansion_service())
+ self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+ self._expansion_service, self.URN)
+
+ def expand(self, input):
+ opts = input.pipeline.options.view_as(StandardOptions)
+ # TODO(https://github.com/apache/beam/issues/21307): Add support for
+ # OnWindowExpiration to more runners. Storage Write API requires
+ # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+ streaming_runners = ['DataflowRunner', 'TestDataflowRunner']
Review Comment:
I don't think we should add a runner check like this to I/O transforms. In
Beam code we try to keep transform and runner logic separate. (so even if
there's an issue, someone could fix it elsewhere and transforms will just
work).
Also, any idea what's broken for other runners in this case (for the sink) ?
##########
sdks/python/test-suites/xlang/build.gradle:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// This is a base file to set up cross language tests for different runners
+import org.apache.beam.gradle.BeamModulePlugin
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+project.evaluationDependsOn(":sdks:python")
+
+// Set up cross language tests
+def envDir = project.project(":sdks:python").envdir
+def jobPort = BeamModulePlugin.getRandomPort()
+def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
+def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
+
+def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
+ dependsOn ':sdks:python:installGcpTest'
+
+ executable 'sh'
+ args '-c', ". ${envDir}/bin/activate && python -m
apache_beam.runners.portability.local_job_service_main --job_port ${jobPort}
--pid_file ${pidFile} --background --stdout_file
${tmpDir}/beam-fnapi-job-server.log"
+}
+
+def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
+ executable 'sh'
+ args '-c', ". ${envDir}/bin/activate && python -m
apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile}
--stop"
+}
+
+// List of objects representing task metadata to create cross-language tasks
from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = []
+
+// ******** Java GCP expansion service ********
+def gcpExpansionProject =
project.project(':sdks:java:io:google-cloud-platform:expansion-service')
Review Comment:
I see. Probably expand the comment above to clarify that.
##########
sdks/python/test-suites/xlang/build.gradle:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// This is a base file to set up cross language tests for different runners
+import org.apache.beam.gradle.BeamModulePlugin
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+project.evaluationDependsOn(":sdks:python")
+
+// Set up cross language tests
+def envDir = project.project(":sdks:python").envdir
+def jobPort = BeamModulePlugin.getRandomPort()
+def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
+def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
+
+def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
+ dependsOn ':sdks:python:installGcpTest'
+
+ executable 'sh'
+ args '-c', ". ${envDir}/bin/activate && python -m
apache_beam.runners.portability.local_job_service_main --job_port ${jobPort}
--pid_file ${pidFile} --background --stdout_file
${tmpDir}/beam-fnapi-job-server.log"
+}
+
+def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
+ executable 'sh'
+ args '-c', ". ${envDir}/bin/activate && python -m
apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile}
--stop"
+}
+
+// List of objects representing task metadata to create cross-language tasks
from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = []
+
+// ******** Java GCP expansion service ********
+def gcpExpansionProject =
project.project(':sdks:java:io:google-cloud-platform:expansion-service')
+// Properties that are common across runners.
+// Used to launch the expansion service, collect the right tests, and cleanup
afterwards
+def gcpXlangCommon = new CrossLanguageTaskCommon().tap {
+ name = "gcpCrossLanguage"
+ expansionProjectPath = gcpExpansionProject.getPath()
+ collectMarker = "uses_gcp_java_expansion_service"
+ startJobServer = setupTask
+ cleanupJobServer = cleanupTask
+}
+xlangTasks.add(gcpXlangCommon)
+
+// ******** Java _____ expansion service ********
Review Comment:
Remove commented code ?
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
}
}
+ /**
***********************************************************************************************/
+ // Method to create the createCrossLanguageUsingJavaExpansionTask.
+ // The method takes CrossLanguageUsingJavaExpansionConfiguration as
parameter.
+ // This method creates a task that runs Python SDK pipeline tests that use
Java transforms via an input expansion service
+ project.ext.createCrossLanguageUsingJavaExpansionTask = {
+ // This task won't work if the python build file doesn't exist.
+ if (!project.project(":sdks:python").buildFile.exists()) {
+ System.err.println 'Python build file not found. Skipping
createCrossLanguageUsingJavaExpansionTask.'
+ return
+ }
+ def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration :
new CrossLanguageUsingJavaExpansionConfiguration()
+
+ project.evaluationDependsOn(":sdks:python")
+ project.evaluationDependsOn(config.expansionProjectPath)
+ project.evaluationDependsOn(":runners:core-construction-java")
+ project.evaluationDependsOn(":sdks:java:extensions:python")
+
+ // Setting up args to launch the expansion service
+ def envDir = project.project(":sdks:python").envdir
+ def pythonDir = project.project(":sdks:python").projectDir
+ def javaExpansionPort = getRandomPort()
+ def expansionJar =
project.project(config.expansionProjectPath).buildTestExpansionServiceJar.archivePath
+ def javaClassLookupAllowlistFile =
project.project(config.expansionProjectPath).projectDir.getPath()
+ def expansionServiceOpts = [
+ "group_id": project.name,
+ "java_expansion_service_jar": expansionJar,
+ "java_port": javaExpansionPort,
+ "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+ ]
+ def serviceArgs =
project.project(':sdks:python').mapToArgString(expansionServiceOpts)
+ def javaContainerSuffix
+ if (JavaVersion.current() == JavaVersion.VERSION_1_8) {
Review Comment:
Ah, yeah, these are configs for Jenkins vs Gradle. Let's keep as is for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]