voonhous commented on code in PR #13480:
URL: https://github.com/apache/hudi/pull/13480#discussion_r2185428515
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java:
##########
@@ -78,14 +81,29 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context
context) {
// setup configuration
long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
- conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+ conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
// set up default parallelism
OptionsInference.setupSinkTasks(conf,
dataStream.getExecutionConfig().getParallelism());
// set up client id
OptionsInference.setupClientId(conf);
// set up index related configs
OptionsInference.setupIndexConfigs(conf);
+ // Since Flink 2.0, the adaptive execution for batch job will generate
job graph incrementally
+ // for multiple stages (FLIP-469). And the write coordinator is
initialized along with write
+ // operator in the final stage, so hudi table should be initialized if
necessary during the plan
+ // compilation phase when adaptive execution is enabled.
Review Comment:
don't quite understand this, what happens in batch execution if it is not
flink2?
##########
.github/workflows/bot.yml:
##########
@@ -1035,6 +1021,55 @@ jobs:
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
./packaging/bundle-validation/ci_run.sh hudi_docker_java17
$HUDI_VERSION openjdk17
+ # flink 2.0 only support Java 11 and above version
+ validate-bundles-java11:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ include:
+ - scalaProfile: 'scala-2.12'
+ flinkProfile: 'flink2.0'
+ flinkAvroVersion: '1.11.4'
+ flinkParquetVersion: '1.14.4'
+ sparkProfile: 'spark3.5'
+ sparkRuntime: 'spark3.5.1'
+
+ steps:
+ - uses: actions/checkout@v3
+ - name: Set up JDK 11
+ uses: actions/setup-java@v3
+ with:
+ java-version: '11'
+ distribution: 'temurin'
+ architecture: x64
+ - name: Build Project
+ env:
+ FLINK_PROFILE: ${{ matrix.flinkProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ FLINK_AVRO_VERSION: ${{ matrix.flinkAvroVersion }}
+ FLINK_PARQUET_VERSION: ${{ matrix.flinkParquetVersion }}
+ run: |
+ mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$FLINK_PROFILE"
-DskipTests=true $MVN_ARGS -pl packaging/hudi-flink-bundle -am
-Davro.version="$FLINK_AVRO_VERSION" -Dparquet.version="$FLINK_PARQUET_VERSION"
+ - name: IT - Bundle Validation - OpenJDK 11
+ env:
+ FLINK_PROFILE: ${{ matrix.flinkProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ run: |
+ HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
+ ./packaging/bundle-validation/ci_run.sh hudi_docker_java11
$HUDI_VERSION openjdk11
+ - name: IT - Bundle Validation - OpenJDK 17
+ env:
+ FLINK_PROFILE: ${{ matrix.flinkProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ run: |
+ HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
Review Comment:
Just to check, we're compiling hudi dependencies using java11, then checking
if it is able to run in a java17 environment here right?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java:
##########
@@ -73,7 +72,28 @@ public static HoodieFlinkWriteClient
createWriteClient(Configuration conf) throw
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
// build the write client to start the embedded timeline server
final HoodieFlinkWriteClient writeClient = new
HoodieFlinkWriteClient<>(new
HoodieFlinkEngineContext(HadoopConfigurations.getHadoopConf(conf)),
writeConfig);
-
writeClient.setOperationType(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
+
writeClient.setOperationType(WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION)));
+ // create the filesystem view storage properties for client
+ initViewStorageProperties(conf, writeConfig);
+ return writeClient;
+ }
+
+ /**
+ * Initialize the 'view_storage_conf' meta file.
+ *
+ * <p>This expects to be used by the driver, the client can then send
requests for files view.
Review Comment:
files view -> filesystem views
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]