TheNeuralBit commented on a change in pull request #14586:
URL: https://github.com/apache/beam/pull/14586#discussion_r623400965



##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
##########
@@ -242,6 +242,7 @@ public int compareTo(TimerData that) {
       }
       ComparisonChain chain =
           ComparisonChain.start()
+              .compare(this.getOutputTimestamp(), that.getOutputTimestamp())

Review comment:
       nit: this looks unrelated

##########
File path: sdks/java/extensions/arrow/build.gradle
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.arrow')
+
+description = "Apache Beam :: SDKs :: Java :: Extensions :: Arrow"
+
+def arrow_version = "3.0.0"

Review comment:
       We may want to go ahead and bump this to 4.0.0 which was [just 
released](https://lists.apache.org/thread.html/rfb4e0065347235571cd0e0e71a7fa38c9652f07e27f98f89b4398feb%40%3Cdev.arrow.apache.org%3E)

##########
File path: 
sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/package-info.java
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** BeamSQL provides a new interface to run a SQL statement with Beam. */

Review comment:
       ```suggestion
   /** Extensions for using Apache Arrow with Beam. */
   ```

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageArrowReader.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.ArrowSchema;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.util.Iterator;
+import javax.annotation.Nullable;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ReadChannel;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.pojo.Schema;

Review comment:
       Is it possible to refactor this so we only need to use 
`org.apache.arrow` in sdks/java/extensions/arrow, and not in the GCP IOs?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
##########
@@ -132,6 +135,9 @@
             .setMaxStreamCount(streamCount)
             .build();
 
+    /*if (DataFormat.ARROW.equals(format)) {
+      requestBuilder.setFormat(DataFormat.ARROW);
+    }*/

Review comment:
       nit: this looks like dead code

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
##########
@@ -48,6 +49,7 @@
       @Nullable String location,
       @Nullable String queryTempDataset,
       @Nullable String kmsKey,
+      DataFormat format,

Review comment:
       Should this be `@Nullable`? It looks like you pass null to this argument 
in `BigQueryIOStorageQueryTest`

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
##########
@@ -37,7 +38,7 @@
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-public class BigQueryStorageQuerySource<T> extends 
BigQueryStorageSourceBase<T> {
+class BigQueryStorageQuerySource<T> extends BigQueryStorageSourceBase<T> {

Review comment:
       Why make this package-private?

##########
File path: website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
##########
@@ -77,11 +77,6 @@
   <td>Remove unneeded deep copy between operators. See 
https://issues.apache.org/jira/browse/BEAM-11146</td>
   <td>Default: <code>false</code></td>
 </tr>
-<tr>
-  <td><code>filesToStage</code></td>
-  <td>Jar-Files to send to all workers and put on the classpath. The default 
value is all files from the classpath.</td>
-  <td></td>
-</tr>

Review comment:
       Can you revert this change (and the equivalent Python one). I think 
something automated is trying to make this change, but we should do it in a 
separate PR

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -601,8 +602,17 @@ public static Read read() {
 
     @Override
     public TableRow apply(SchemaAndRecord schemaAndRecord) {
-      return BigQueryAvroUtils.convertGenericRecordToTableRow(
-          schemaAndRecord.getRecord(), schemaAndRecord.getTableSchema());
+      // TODO(BEAM-9114): Implement a function to encapsulate row conversion 
logic.
+      try {
+        return BigQueryAvroUtils.convertGenericRecordToTableRow(
+            schemaAndRecord.getRecord(), schemaAndRecord.getTableSchema());
+      } catch (IllegalStateException i) {
+        if (schemaAndRecord.getRow() != null) {
+          return BigQueryUtils.toTableRow().apply(schemaAndRecord.getRow());
+        }
+        throw new IllegalStateException(
+            "Record should be of instance GenericRecord (for Avro format) or 
of instance Row (for Arrow format), but it is not.");
+      }

Review comment:
       This feels pretty hacky. Ideally I think we would use `Row` as an 
intermediate format that can be backed by either an Avro `GenericRecord` or an 
Arrow `RecordBatch`. But if that's too tricky to get right we should at least 
avoid branching within a `DoFn` like this, since it will mean checking the type 
of every single element. We could instead produce different logic at pipeline 
construction time.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -525,6 +525,11 @@ public static TableRow toTableRow(Row row) {
         return toTableRow((Row) fieldValue);
 
       case DATETIME:
+        if (fieldValue instanceof Long) {
+          return Instant.ofEpochMilli((long) fieldValue)
+              .toDateTime(DateTimeZone.UTC)
+              .toString(BIGQUERY_TIMESTAMP_PRINTER);
+        }

Review comment:
       Why is this necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to