Copilot commented on code in PR #12327:
URL: https://github.com/apache/gluten/pull/12327#discussion_r3510546430
##########
gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java:
##########
@@ -242,11 +245,45 @@ private void executeQuery(StreamTableEnvironment tEnv,
String queryFileName, boo
assertThat(checkJobRunningStatus(insertResult, 30000) == true);
} else {
waitForJobCompletion(insertResult, 30000);
+ if ("q10_orc.sql".equals(queryFileName)) {
+ verifyQ10OrcOutput(queryStartMillis);
+ }
}
}
assertTrue(sqlStatements[sqlStatements.length - 1].trim().isEmpty());
}
+ private void verifyQ10OrcOutput(long queryStartMillis) {
+ Path outputDir = Paths.get("/tmp/data/output/bid");
+ assertTrue("Q10 ORC output directory should exist",
Files.exists(outputDir));
+ try {
+ List<Path> outputFiles;
+ try (java.util.stream.Stream<Path> files = Files.walk(outputDir)) {
+ outputFiles =
+ files
+ .filter(Files::isRegularFile)
+ .filter(path -> isModifiedAfter(path, queryStartMillis))
+ .sorted()
+ .collect(Collectors.toList());
+ }
Review Comment:
`verifyQ10OrcOutput` currently asserts every newly-modified regular file
under the output dir has size > 0. With `'sink.partition-commit.policy.kind' =
'success-file'`, the sink can create empty success marker files (e.g.,
`_SUCCESS`), which would make this test fail even when ORC data files are
written correctly. Filter to ORC data files instead of all regular files.
##########
gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java:
##########
@@ -71,7 +72,7 @@ public class NexmarkTest {
}
};
- private static final int KAFKA_PORT = 19092;
+ private static final int KAFKA_PORT =
Integer.getInteger("gluten.flink.ut.kafka.port", 9092);
Review Comment:
Defaulting the embedded Kafka listener to port 9092 increases the chance of
local/CI port collisions. The previous default (19092) is less likely to be
occupied while still allowing overrides via `gluten.flink.ut.kafka.port`.
##########
gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java:
##########
@@ -234,28 +262,49 @@ public void processWatermark2(Watermark mark) throws
Exception {
processElementInternal();
}
+ @Override
+ public void endInput(int inputId) throws Exception {
+ switch (inputId) {
+ case 1:
+ leftInputEnded = true;
+ if (leftInputQueue != null) {
+ leftInputQueue.noMoreInput();
+ }
+ break;
+ case 2:
+ rightInputEnded = true;
+ if (rightInputQueue != null) {
+ rightInputQueue.noMoreInput();
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown input id: " + inputId);
+ }
+ }
Review Comment:
`endInput(int)` sets `leftInputEnded/rightInputEnded` and calls
`noMoreInput()`, but never finishes/drains the Velox task when both inputs have
ended. This means bounded jobs may not flush remaining output and may not
terminate. The newly added `finishTask()` and `*InputEnded` flags are currently
unused beyond being set.
##########
gluten-flink/ut/pom.xml:
##########
@@ -195,6 +195,24 @@
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-orc</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.25.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.7.4</version>
+ <scope>test</scope>
+ </dependency>
Review Comment:
Hard-coding Hadoop to 2.7.4 here prevents any parent/profile override of
`${hadoop.version}` (the root pom defines multiple Hadoop-version profiles).
Use the shared property (or omit the version if dependencyManagement is used)
so Flink UTs follow the selected Hadoop version.
##########
gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java:
##########
@@ -259,10 +285,18 @@ public void processWatermark2(Watermark mark) throws
Exception {
throw new UnsupportedOperationException("Not implemented for
GlutenOneInputOperator");
}
+ @Override
+ public void endInput() throws Exception {
+ if (inputQueue != null) {
+ inputQueue.noMoreInput();
+ }
+ }
Review Comment:
`endInput()` only signals `inputQueue.noMoreInput()` but never
drains/finishes the Velox task. For bounded inputs (via `BoundedOneInput`),
this can leave pending output un-emitted and prevent the job from reaching a
finished state. You already introduced `finishTask()` for this purpose, but it
is currently unused.
##########
gluten-flink/ut/pom.xml:
##########
@@ -195,6 +195,24 @@
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-orc</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.25.5</version>
+ <scope>test</scope>
+ </dependency>
Review Comment:
Hard-coding the Protobuf version here bypasses the repo-wide
`protobuf.version` property and can introduce dependency skew across
modules/profiles. Prefer using the shared property so this test dependency
stays consistent with the rest of the build.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]