apilloud commented on a change in pull request #14254:
URL: https://github.com/apache/beam/pull/14254#discussion_r595602239



##########
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -229,24 +226,19 @@ public Duration getAllowedTimestampSkew() {
     public void processElement(
         @Element Row row, @Timestamp Instant t, BoundedWindow w, 
OutputReceiver<Row> r)
         throws InterruptedException {
-      final Future<Value> valueFuture;
-
-      if (row.equals(previousRow)) {
-        valueFuture = previousFuture;
-      } else {
-        Map<String, Value> columns = new HashMap<>();
-        for (int i : referencedColumns) {
-          columns.put(
-              columnName(i),
-              ZetaSqlBeamTranslationUtils.toZetaSqlValue(
-                  row.getBaseValue(i, Object.class), 
inputSchema.getField(i).getType()));
-        }
-
-        valueFuture = stream.execute(columns, nullParams);
+      Map<String, Value> columns = new HashMap<>();
+      for (int i : checkArgumentNotNull(referencedColumns)) {

Review comment:
       If you set `referencedColumns` to a default value in the constructor 
(say `self.referencedColumns = ImmutableList.of()`) you could drop the 
`@Nullable` and remove the `checkArgumentNotNull`.

##########
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -229,24 +226,19 @@ public Duration getAllowedTimestampSkew() {
     public void processElement(
         @Element Row row, @Timestamp Instant t, BoundedWindow w, 
OutputReceiver<Row> r)
         throws InterruptedException {
-      final Future<Value> valueFuture;
-
-      if (row.equals(previousRow)) {
-        valueFuture = previousFuture;
-      } else {
-        Map<String, Value> columns = new HashMap<>();
-        for (int i : referencedColumns) {
-          columns.put(
-              columnName(i),
-              ZetaSqlBeamTranslationUtils.toZetaSqlValue(
-                  row.getBaseValue(i, Object.class), 
inputSchema.getField(i).getType()));
-        }
-
-        valueFuture = stream.execute(columns, nullParams);
+      Map<String, Value> columns = new HashMap<>();
+      for (int i : checkArgumentNotNull(referencedColumns)) {
+        columns.put(
+            columnName(i),
+            ZetaSqlBeamTranslationUtils.toZetaSqlValue(
+                row.getBaseValue(i, Object.class),
+                inputSchema.getField(i).getType()));
       }
-      previousRow = row;
 
-      @Nullable Queue<TimestampedFuture> pendingWindow = pending.get(w);
+      @NonNull
+      Future<Value> valueFuture = 
checkArgumentNotNull(stream).execute(columns, nullParams);
+
+      @Nullable Queue<TimestampedFuture> pendingWindow = 
checkArgumentNotNull(pending).get(w);

Review comment:
       Add `this.pending = new HashMap<>();` to the constructor, then you can 
drop the nullness check here too.

##########
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -201,23 +199,22 @@ public void setup() {
             
ZetaSqlBeamTranslationUtils.toZetaSqlType(inputSchema.getField(i).getType()));
       }
 
-      exp = new PreparedExpression(sql);
-      exp.prepare(options);
+      PreparedExpression expression = new PreparedExpression(sql);
+      exp = expression;

Review comment:
       I'm not really a fan of this. I believe you can move `self.exp = new 
PreparedExpression(sql);` to the constructor so `exp` doesn't need to be 
`@Nullable`? (You'll also need to add the line to `teardown` as a 
`PreparedExpression` can't be reused.) Then you should be able to revert the 
rest of the changes to this method. If that doesn't work, I'd prefer nullness 
checks in this method for consistency.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to