Copilot commented on code in PR #12327:
URL: https://github.com/apache/gluten/pull/12327#discussion_r3510546430


##########
gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java:
##########
@@ -242,11 +245,45 @@ private void executeQuery(StreamTableEnvironment tEnv, 
String queryFileName, boo
         assertThat(checkJobRunningStatus(insertResult, 30000) == true);
       } else {
         waitForJobCompletion(insertResult, 30000);
+        if ("q10_orc.sql".equals(queryFileName)) {
+          verifyQ10OrcOutput(queryStartMillis);
+        }
       }
     }
     assertTrue(sqlStatements[sqlStatements.length - 1].trim().isEmpty());
   }
 
+  private void verifyQ10OrcOutput(long queryStartMillis) {
+    Path outputDir = Paths.get("/tmp/data/output/bid");
+    assertTrue("Q10 ORC output directory should exist", 
Files.exists(outputDir));
+    try {
+      List<Path> outputFiles;
+      try (java.util.stream.Stream<Path> files = Files.walk(outputDir)) {
+        outputFiles =
+            files
+                .filter(Files::isRegularFile)
+                .filter(path -> isModifiedAfter(path, queryStartMillis))
+                .sorted()
+                .collect(Collectors.toList());
+      }

Review Comment:
   `verifyQ10OrcOutput` currently asserts every newly-modified regular file 
under the output dir has size > 0. With `'sink.partition-commit.policy.kind' = 
'success-file'`, the sink can create empty success marker files (e.g., 
`_SUCCESS`), which would make this test fail even when ORC data files are 
written correctly. Filter to ORC data files instead of all regular files.



##########
gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java:
##########
@@ -71,7 +72,7 @@ public class NexmarkTest {
         }
       };
 
-  private static final int KAFKA_PORT = 19092;
+  private static final int KAFKA_PORT = 
Integer.getInteger("gluten.flink.ut.kafka.port", 9092);

Review Comment:
   Defaulting the embedded Kafka listener to port 9092 increases the chance of 
local/CI port collisions. The previous default (19092) is less likely to be 
occupied while still allowing overrides via `gluten.flink.ut.kafka.port`.



##########
gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java:
##########
@@ -234,28 +262,49 @@ public void processWatermark2(Watermark mark) throws 
Exception {
     processElementInternal();
   }
 
+  @Override
+  public void endInput(int inputId) throws Exception {
+    switch (inputId) {
+      case 1:
+        leftInputEnded = true;
+        if (leftInputQueue != null) {
+          leftInputQueue.noMoreInput();
+        }
+        break;
+      case 2:
+        rightInputEnded = true;
+        if (rightInputQueue != null) {
+          rightInputQueue.noMoreInput();
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown input id: " + inputId);
+    }
+  }

Review Comment:
   `endInput(int)` sets `leftInputEnded/rightInputEnded` and calls 
`noMoreInput()`, but never finishes/drains the Velox task when both inputs have 
ended. This means bounded jobs may not flush remaining output and may not 
terminate. The newly added `finishTask()` and `*InputEnded` flags are currently 
unused beyond being set.



##########
gluten-flink/ut/pom.xml:
##########
@@ -195,6 +195,24 @@
       <version>${flink.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-orc</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>3.25.5</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>2.7.4</version>
+      <scope>test</scope>
+    </dependency>

Review Comment:
   Hard-coding Hadoop to 2.7.4 here prevents any parent/profile override of 
`${hadoop.version}` (the root pom defines multiple Hadoop-version profiles). 
Use the shared property (or omit the version if dependencyManagement is used) 
so Flink UTs follow the selected Hadoop version.



##########
gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java:
##########
@@ -259,10 +285,18 @@ public void processWatermark2(Watermark mark) throws 
Exception {
     throw new UnsupportedOperationException("Not implemented for 
GlutenOneInputOperator");
   }
 
+  @Override
+  public void endInput() throws Exception {
+    if (inputQueue != null) {
+      inputQueue.noMoreInput();
+    }
+  }

Review Comment:
   `endInput()` only signals `inputQueue.noMoreInput()` but never 
drains/finishes the Velox task. For bounded inputs (via `BoundedOneInput`), 
this can leave pending output un-emitted and prevent the job from reaching a 
finished state. You already introduced `finishTask()` for this purpose, but it 
is currently unused.



##########
gluten-flink/ut/pom.xml:
##########
@@ -195,6 +195,24 @@
       <version>${flink.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-orc</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>3.25.5</version>
+      <scope>test</scope>
+    </dependency>

Review Comment:
   Hard-coding the Protobuf version here bypasses the repo-wide 
`protobuf.version` property and can introduce dependency skew across 
modules/profiles. Prefer using the shared property so this test dependency 
stays consistent with the rest of the build.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to