This is an automated email from the ASF dual-hosted git repository.
xxubai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 9126f7318 [AMORO-3853] Support Java 17 build for Amoro (#4244)
9126f7318 is described below
commit 9126f7318adbe70e6d30dacfc5994e2f28f4e32e
Author: Xu Bai <[email protected]>
AuthorDate: Wed Jun 24 18:51:50 2026 +0800
[AMORO-3853] Support Java 17 build for Amoro (#4244)
* Refactor flink module for JDK 17 support
Add JDK 17 compatibility
* Update SLF4J dependency to version 2.0.17 and adjust README for JDK
compatibility
* Fix CI configuration to properly format JDK matrix in core-hadoop3-ci.yml
* Fix CI configuration to properly format JDK matrix in core-hadoop3-ci.yml
* Stabilize continuous optimizing tests on Java 17
* Update TestAutomaticLogWriter to use truncated LocalDateTime for
consistency in time calculations
* Remove reflective invocation of emitPeriodicWatermark in
MixedFormatSourceReader
* use FlinkWriteResult for consistency
* Add locality exposure in IcebergClassUtil and implement tests for file
committer
* Remove Java 17 from CI build matrix and update wrapKrb method to accept
nullable startSnapshotId
* spotless
* address
---
.github/workflows/core-hadoop3-ci.yml | 7 +-
README.md | 2 +-
amoro-format-hudi/pom.xml | 2 -
.../amoro-mixed-flink-common/pom.xml | 10 +-
.../interceptor/KerberosInvocationHandler.java | 1 -
.../hybrid/reader/MixedFormatSourceReader.java | 38 ++--
.../org/apache/amoro/flink/table/FlinkSource.java | 35 ++-
.../flink/table/KerberosAwareInputFormat.java | 165 ++++++++++++++
.../amoro/flink/table/MixedFormatTableLoader.java | 247 ++++++++++++++++++++-
.../table/UnkeyedInputFormatOperatorFactory.java | 8 +-
.../table/UnkeyedStreamingReaderOperator.java | 175 +++++++++++++++
.../amoro/flink/util/FlinkClassReflectionUtil.java | 65 ------
.../apache/amoro/flink/util/IcebergClassUtil.java | 119 +++++-----
.../org/apache/amoro/flink/write/FlinkSink.java | 8 +-
.../apache/amoro/flink/table/TestWatermark.java | 66 +-----
.../amoro/flink/util/TestIcebergClassUtil.java | 75 +++++++
.../amoro/flink/write/TestAutomaticLogWriter.java | 14 +-
.../flink/write/TestMixedFormatFileWriter.java | 11 +-
amoro-format-mixed/amoro-mixed-trino/pom.xml | 1 -
amoro-format-paimon/pom.xml | 9 +
pom.xml | 65 +++++-
21 files changed, 881 insertions(+), 242 deletions(-)
diff --git a/.github/workflows/core-hadoop3-ci.yml
b/.github/workflows/core-hadoop3-ci.yml
index e93a32aa4..aebb929f0 100644
--- a/.github/workflows/core-hadoop3-ci.yml
+++ b/.github/workflows/core-hadoop3-ci.yml
@@ -37,8 +37,13 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- jdk: [ '11' ]
+ jdk: [ '11', '17' ]
spark: [ '3.3','3.4', '3.5' ]
+ exclude:
+ - jdk: '17'
+ spark: '3.3'
+ - jdk: '17'
+ spark: '3.4'
name: Build Amoro with JDK ${{ matrix.jdk }} Spark-${{ matrix.spark }}
steps:
- uses: actions/checkout@v3
diff --git a/README.md b/README.md
index e07afe7b0..4746c444e 100644
--- a/README.md
+++ b/README.md
@@ -116,7 +116,7 @@ Amoro contains modules as below:
## Building
-Amoro is built using Maven with JDK 8, 11 and 17(required for
`amoro-format-mixed/amoro-mixed-trino` module).
+Amoro is built using Maven with JDK 11 and 17(required for
`amoro-format-mixed/amoro-mixed-trino` module, experimental for other modules).
* Build all modules without `amoro-mixed-trino`: `./mvnw clean package`
* Build and skip tests: `./mvnw clean package -DskipTests`
diff --git a/amoro-format-hudi/pom.xml b/amoro-format-hudi/pom.xml
index ace84c0a1..7e40a2695 100644
--- a/amoro-format-hudi/pom.xml
+++ b/amoro-format-hudi/pom.xml
@@ -30,8 +30,6 @@
<name>Amoro Project Hudi Format</name>
<properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
index 60d5d7b36..64b8a5f19 100644
--- a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
+++ b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/pom.xml
@@ -324,6 +324,14 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.amoro</groupId>
+ <artifactId>amoro-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.amoro</groupId>
<artifactId>amoro-mixed-hive</artifactId>
@@ -422,7 +430,7 @@
<value>org.apache.amoro.listener.AmoroRunListener</value>
</property>
</properties>
- <argLine>-verbose:class</argLine>
+ <argLine>${amoro.surefire.baseArgLine}</argLine>
</configuration>
</plugin>
<plugin>
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/interceptor/KerberosInvocationHandler.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/interceptor/KerberosInvocationHandler.java
index 25dce7fb0..7349a61cb 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/interceptor/KerberosInvocationHandler.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/interceptor/KerberosInvocationHandler.java
@@ -56,7 +56,6 @@ public class KerberosInvocationHandler<T> implements
InvocationHandler, Serializ
authenticatedFileIO.doAs(
() -> {
try {
- method.setAccessible(true);
return method.invoke(obj, args);
} catch (Throwable e) {
throw new RuntimeException(e);
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java
index caa7b6f83..eea767ae8 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/read/hybrid/reader/MixedFormatSourceReader.java
@@ -23,11 +23,9 @@ import
org.apache.amoro.flink.read.hybrid.enumerator.InitializationFinishedEvent
import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplit;
import org.apache.amoro.flink.read.hybrid.split.MixedFormatSplitState;
import org.apache.amoro.flink.read.hybrid.split.SplitRequestEvent;
-import org.apache.amoro.flink.util.FlinkClassReflectionUtil;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
@@ -35,13 +33,13 @@ import
org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.core.io.InputStatus;
-import
org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks;
import
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -132,25 +130,21 @@ public class MixedFormatSourceReader<T>
return new MixedFormatReaderOutput<>(output);
}
- /**
- * There is a case that the watermark in {@link
WatermarkOutputMultiplexer.OutputState} has been
- * updated, but watermark has not been emitted for that when {@link
- * WatermarkOutputMultiplexer#onPeriodicEmit} called, the outputState has
been removed by {@link
- * WatermarkOutputMultiplexer#unregisterOutput(String)} after split
finished. Wrap {@link
- * ReaderOutput} to call {@link
- *
ProgressiveTimestampsAndWatermarks.SplitLocalOutputs#emitPeriodicWatermark()}
when split
- * finishes.
- */
+ /** Wrap split outputs so we can flush any pending periodic watermark before
release. */
static class MixedFormatReaderOutput<T> implements ReaderOutput<T> {
private final ReaderOutput<T> internal;
+ private final SourceOutputWithWatermarks<T> watermarkOutput;
+ private final Map<String, SourceOutput<T>> splitOutputs = new HashMap<>();
+ @SuppressWarnings("unchecked")
public MixedFormatReaderOutput(ReaderOutput<T> readerOutput) {
Preconditions.checkArgument(
readerOutput instanceof SourceOutputWithWatermarks,
"readerOutput should be SourceOutputWithWatermarks, but was %s",
readerOutput.getClass());
this.internal = readerOutput;
+ this.watermarkOutput = (SourceOutputWithWatermarks<T>) readerOutput;
}
@Override
@@ -180,14 +174,28 @@ public class MixedFormatSourceReader<T>
@Override
public SourceOutput<T> createOutputForSplit(String splitId) {
- return internal.createOutputForSplit(splitId);
+ SourceOutput<T> splitOutput = internal.createOutputForSplit(splitId);
+ splitOutputs.put(splitId, splitOutput);
+ return splitOutput;
}
@Override
public void releaseOutputForSplit(String splitId) {
- Object splitLocalOutput =
FlinkClassReflectionUtil.getSplitLocalOutput(internal);
- FlinkClassReflectionUtil.emitPeriodWatermark(splitLocalOutput);
+ emitPeriodicWatermark(splitOutputs.remove(splitId));
internal.releaseOutputForSplit(splitId);
}
+
+ private void emitPeriodicWatermark(SourceOutput<T> splitOutput) {
+ if (splitOutput == null) {
+ return;
+ }
+
+ if (splitOutput instanceof SourceOutputWithWatermarks) {
+ ((SourceOutputWithWatermarks<T>) splitOutput).emitPeriodicWatermark();
+ return;
+ }
+
+ watermarkOutput.emitPeriodicWatermark();
+ }
}
}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java
index 3e4080e8a..2a89f81ed 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/FlinkSource.java
@@ -40,7 +40,6 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamSource;
import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -49,13 +48,17 @@ import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.source.FlinkInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -243,37 +246,55 @@ public class FlinkSource {
.properties(properties)
.flinkConf(flinkConf)
.limit(limit);
+ Long startSnapshotId = null;
if
(MixedFormatValidator.SCAN_STARTUP_MODE_LATEST.equalsIgnoreCase(scanStartupMode))
{
Optional<Snapshot> startSnapshotOptional =
Optional.ofNullable(tableLoader.loadTable().currentSnapshot());
if (startSnapshotOptional.isPresent()) {
Snapshot snapshot = startSnapshotOptional.get();
+ startSnapshotId = snapshot.snapshotId();
LOG.info(
"Get starting snapshot id {} based on scan startup mode {}",
snapshot.snapshotId(),
scanStartupMode);
- builder.startSnapshotId(snapshot.snapshotId());
+ builder.startSnapshotId(startSnapshotId);
}
}
DataStream<RowData> origin = builder.build();
- return wrapKrb(origin).assignTimestampsAndWatermarks(watermarkStrategy);
+ return wrapKrb(origin,
startSnapshotId).assignTimestampsAndWatermarks(watermarkStrategy);
}
/** extract op from dataStream, and wrap krb support */
- private DataStream<RowData> wrapKrb(DataStream<RowData> ds) {
+ private DataStream<RowData> wrapKrb(DataStream<RowData> ds, @Nullable Long
startSnapshotId) {
IcebergClassUtil.clean(env);
Transformation origin = ds.getTransformation();
int scanParallelism =
flinkConf
.getOptional(MixedFormatValidator.SCAN_PARALLELISM)
.orElse(origin.getParallelism());
+ Table table = mixedTable.asUnkeyedTable();
+ Schema projectedIcebergSchema =
+ projectedSchema == null
+ ? mixedTable.schema()
+ : FlinkSchemaUtil.convert(
+ mixedTable.schema(),
+
org.apache.amoro.flink.FlinkSchemaUtil.filterWatermark(projectedSchema));
if (origin instanceof OneInputTransformation) {
OneInputTransformation<RowData, RowData> tf =
(OneInputTransformation<RowData, RowData>) ds.getTransformation();
- OneInputStreamOperatorFactory op = (OneInputStreamOperatorFactory)
tf.getOperatorFactory();
ProxyFactory<FlinkInputFormat> inputFormatProxyFactory =
- IcebergClassUtil.getInputFormatProxyFactory(op, mixedTable.io(),
mixedTable.schema());
+ IcebergClassUtil.getInputFormatProxyFactory(
+ tableLoader,
+ table,
+ mixedTable.io(),
+ mixedTable.schema(),
+ projectedIcebergSchema,
+ flinkConf,
+ properties,
+ filters,
+ limit,
+ startSnapshotId);
if (tf.getInputs().isEmpty()) {
return env.addSource(
@@ -305,7 +326,7 @@ public class FlinkSource {
(InputFormatSourceFunction)
IcebergClassUtil.getSourceFunction(source);
InputFormat inputFormatProxy =
- (InputFormat) ProxyUtil.getProxy(function.getFormat(),
mixedTable.io());
+ new KerberosAwareInputFormat<>(function.getFormat(),
mixedTable.io());
DataStreamSource sourceStream =
env.createInput(inputFormatProxy, tfSource.getOutputType())
.setParallelism(scanParallelism);
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/KerberosAwareInputFormat.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/KerberosAwareInputFormat.java
new file mode 100644
index 000000000..92d3687d9
--- /dev/null
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/KerberosAwareInputFormat.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.flink.table;
+
+import org.apache.amoro.io.AuthenticatedFileIO;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+/**
+ * A concrete {@link InputFormat} wrapper that runs delegate calls inside
{@link
+ * AuthenticatedFileIO#doAs(Callable)} without using JDK dynamic proxies.
+ */
+public class KerberosAwareInputFormat<OT, T extends InputSplit> extends
RichInputFormat<OT, T> {
+ private static final long serialVersionUID = 1L;
+
+ private final InputFormat<OT, T> delegate;
+ private final AuthenticatedFileIO authenticatedFileIO;
+
+ public KerberosAwareInputFormat(
+ InputFormat<OT, T> delegate, AuthenticatedFileIO authenticatedFileIO) {
+ this.delegate = delegate;
+ this.authenticatedFileIO = authenticatedFileIO;
+ }
+
+ @Override
+ public void configure(org.apache.flink.configuration.Configuration
parameters) {
+ authenticatedFileIO.doAs(
+ () -> {
+ delegate.configure(parameters);
+ return null;
+ });
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws
IOException {
+ try {
+ return authenticatedFileIO.doAs(() ->
delegate.getStatistics(cachedStatistics));
+ } catch (RuntimeException e) {
+ throw unwrapIOException(e);
+ }
+ }
+
+ @Override
+ public T[] createInputSplits(int minNumSplits) throws IOException {
+ try {
+ return authenticatedFileIO.doAs(() ->
delegate.createInputSplits(minNumSplits));
+ } catch (RuntimeException e) {
+ throw unwrapIOException(e);
+ }
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(T[] inputSplits) {
+ return authenticatedFileIO.doAs(() ->
delegate.getInputSplitAssigner(inputSplits));
+ }
+
+ @Override
+ public void open(T split) throws IOException {
+ try {
+ authenticatedFileIO.doAs(
+ () -> {
+ delegate.open(split);
+ return null;
+ });
+ } catch (RuntimeException e) {
+ throw unwrapIOException(e);
+ }
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ try {
+ return authenticatedFileIO.doAs(delegate::reachedEnd);
+ } catch (RuntimeException e) {
+ throw unwrapIOException(e);
+ }
+ }
+
+ @Override
+ public OT nextRecord(OT reuse) throws IOException {
+ try {
+ return authenticatedFileIO.doAs(() -> delegate.nextRecord(reuse));
+ } catch (RuntimeException e) {
+ throw unwrapIOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ authenticatedFileIO.doAs(
+ () -> {
+ delegate.close();
+ return null;
+ });
+ } catch (RuntimeException e) {
+ throw unwrapIOException(e);
+ }
+ }
+
+ @Override
+ public void openInputFormat() throws IOException {
+ if (!(delegate instanceof RichInputFormat)) {
+ return;
+ }
+ RichInputFormat<OT, T> richInputFormat = (RichInputFormat<OT, T>) delegate;
+ richInputFormat.setRuntimeContext(getRuntimeContext());
+ try {
+ authenticatedFileIO.doAs(
+ () -> {
+ richInputFormat.openInputFormat();
+ return null;
+ });
+ } catch (RuntimeException e) {
+ throw unwrapIOException(e);
+ }
+ }
+
+ @Override
+ public void closeInputFormat() throws IOException {
+ if (!(delegate instanceof RichInputFormat)) {
+ return;
+ }
+ RichInputFormat<OT, T> richInputFormat = (RichInputFormat<OT, T>) delegate;
+ try {
+ authenticatedFileIO.doAs(
+ () -> {
+ richInputFormat.closeInputFormat();
+ return null;
+ });
+ } catch (RuntimeException e) {
+ throw unwrapIOException(e);
+ }
+ }
+
+ private IOException unwrapIOException(RuntimeException exception) {
+ Throwable cause = exception.getCause();
+ if (cause instanceof IOException) {
+ return (IOException) cause;
+ }
+ throw exception;
+ }
+}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java
index d7282739f..d8a79243c 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/MixedFormatTableLoader.java
@@ -21,14 +21,33 @@ package org.apache.amoro.flink.table;
import org.apache.amoro.flink.InternalCatalogBuilder;
import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
import
org.apache.amoro.flink.interceptor.FlinkTablePropertiesInvocationHandler;
+import org.apache.amoro.hive.table.SupportHive;
import org.apache.amoro.mixed.MixedFormatCatalog;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
+import org.apache.amoro.table.BasicKeyedTable;
+import org.apache.amoro.table.BasicUnkeyedTable;
+import org.apache.amoro.table.ChangeTable;
+import org.apache.amoro.table.KeyedTable;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.TableIdentifier;
+import org.apache.amoro.table.UnkeyedTable;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.ManageSnapshots;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.util.StructLikeMap;
import java.io.IOException;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@@ -93,7 +112,7 @@ public class MixedFormatTableLoader implements TableLoader {
Boolean loadBaseForKeyedTable) {
this.catalogBuilder = catalogBuilder;
this.tableIdentifier = tableIdentifier;
- this.flinkTableProperties = flinkTableProperties;
+ this.flinkTableProperties = new HashMap<>(flinkTableProperties);
this.loadBaseForKeyedTable = loadBaseForKeyedTable == null ||
loadBaseForKeyedTable;
}
@@ -108,10 +127,8 @@ public class MixedFormatTableLoader implements TableLoader
{
}
public MixedTable loadMixedFormatTable() {
- return ((MixedTable)
- new FlinkTablePropertiesInvocationHandler(
- flinkTableProperties,
mixedFormatCatalog.loadTable(tableIdentifier))
- .getProxy());
+ MixedTable table = mixedFormatCatalog.loadTable(tableIdentifier);
+ return wrapWithFlinkTableProperties(table, flinkTableProperties);
}
public void switchLoadInternalTableForKeyedTable(boolean
loadBaseForKeyedTable) {
@@ -142,6 +159,13 @@ public class MixedFormatTableLoader implements TableLoader
{
tableIdentifier, catalogBuilder, flinkTableProperties,
loadBaseForKeyedTable);
}
+ public MixedFormatTableLoader copyWithFlinkTableProperties(Map<String,
String> extraProperties) {
+ Map<String, String> merged = new HashMap<>(flinkTableProperties);
+ merged.putAll(extraProperties);
+ return new MixedFormatTableLoader(
+ tableIdentifier, catalogBuilder, merged, loadBaseForKeyedTable);
+ }
+
@Override
public void close() throws IOException {}
@@ -149,4 +173,217 @@ public class MixedFormatTableLoader implements
TableLoader {
public String toString() {
return MoreObjects.toStringHelper(this).add("tableIdentifier",
tableIdentifier).toString();
}
+
+ @VisibleForTesting
+ static MixedTable wrapWithFlinkTableProperties(
+ MixedTable mixedTable, Map<String, String> flinkTableProperties) {
+ if (flinkTableProperties == null || flinkTableProperties.isEmpty()) {
+ return mixedTable;
+ }
+
+ if (mixedTable instanceof SupportHive) {
+ return (MixedTable)
+ new FlinkTablePropertiesInvocationHandler(flinkTableProperties,
mixedTable).getProxy();
+ }
+
+ if (mixedTable.isUnkeyedTable()) {
+ return new FlinkTablePropertiesUnkeyedTable(
+ mixedTable.asUnkeyedTable(), flinkTableProperties);
+ }
+
+ if (mixedTable.isKeyedTable()) {
+ return new FlinkTablePropertiesKeyedTable(mixedTable.asKeyedTable(),
flinkTableProperties);
+ }
+
+ return mixedTable;
+ }
+
+ private static class FlinkTablePropertiesSupport implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ protected final Map<String, String> flinkTableProperties;
+
+ protected FlinkTablePropertiesSupport(Map<String, String>
flinkTableProperties) {
+ this.flinkTableProperties = new HashMap<>(flinkTableProperties);
+ }
+
+ protected Map<String, String> withFlinkTableProperties(Map<String, String>
tableProperties) {
+ Map<String, String> merged = new HashMap<>(tableProperties);
+ merged.putAll(flinkTableProperties);
+ return merged;
+ }
+ }
+
+ private static class FlinkTablePropertiesUnkeyedTable extends
BasicUnkeyedTable
+ implements UnkeyedTable {
+ private static final long serialVersionUID = 1L;
+
+ private final UnkeyedTable delegate;
+ private final FlinkTablePropertiesSupport propertiesSupport;
+
+ private FlinkTablePropertiesUnkeyedTable(
+ UnkeyedTable delegate, Map<String, String> flinkTableProperties) {
+ super(delegate.id(), delegate, delegate.io(), null);
+ this.delegate = delegate;
+ this.propertiesSupport = new
FlinkTablePropertiesSupport(flinkTableProperties);
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return propertiesSupport.withFlinkTableProperties(delegate.properties());
+ }
+
+ @Override
+ public void refresh() {
+ delegate.refresh();
+ }
+
+ @Override
+ public UpdateSchema updateSchema() {
+ return delegate.updateSchema();
+ }
+
+ @Override
+ public AppendFiles newAppend() {
+ return delegate.newAppend();
+ }
+
+ @Override
+ public AppendFiles newFastAppend() {
+ return delegate.newFastAppend();
+ }
+
+ @Override
+ public RewriteFiles newRewrite() {
+ return delegate.newRewrite();
+ }
+
+ @Override
+ public OverwriteFiles newOverwrite() {
+ return delegate.newOverwrite();
+ }
+
+ @Override
+ public RowDelta newRowDelta() {
+ return delegate.newRowDelta();
+ }
+
+ @Override
+ public ReplacePartitions newReplacePartitions() {
+ return delegate.newReplacePartitions();
+ }
+
+ @Override
+ public DeleteFiles newDelete() {
+ return delegate.newDelete();
+ }
+
+ @Override
+ public ExpireSnapshots expireSnapshots() {
+ return delegate.expireSnapshots();
+ }
+
+ @Override
+ public ManageSnapshots manageSnapshots() {
+ return delegate.manageSnapshots();
+ }
+
+ @Override
+ public Transaction newTransaction() {
+ return delegate.newTransaction();
+ }
+
+ @Override
+ public StructLikeMap<Map<String, String>> partitionProperty() {
+ return delegate.partitionProperty();
+ }
+
+ @Override
+ public org.apache.amoro.op.UpdatePartitionProperties
updatePartitionProperties(
+ Transaction transaction) {
+ return delegate.updatePartitionProperties(transaction);
+ }
+ }
+
+ private static class FlinkTablePropertiesKeyedTable extends BasicKeyedTable
+ implements KeyedTable {
+ private static final long serialVersionUID = 1L;
+
+ private final KeyedTable delegate;
+ private final FlinkTablePropertiesSupport propertiesSupport;
+
+ private FlinkTablePropertiesKeyedTable(
+ KeyedTable delegate, Map<String, String> flinkTableProperties) {
+ super(
+ delegate.location(),
+ delegate.primaryKeySpec(),
+ new FlinkTablePropertiesBaseTable(delegate.baseTable(),
flinkTableProperties),
+ new FlinkTablePropertiesChangeTable(delegate.changeTable(),
flinkTableProperties));
+ this.delegate = delegate;
+ this.propertiesSupport = new
FlinkTablePropertiesSupport(flinkTableProperties);
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return propertiesSupport.withFlinkTableProperties(delegate.properties());
+ }
+
+ @Override
+ public void refresh() {
+ delegate.refresh();
+ }
+ }
+
+ private static class FlinkTablePropertiesBaseTable extends
BasicKeyedTable.BaseInternalTable {
+ private static final long serialVersionUID = 1L;
+
+ private final UnkeyedTable delegate;
+ private final FlinkTablePropertiesSupport propertiesSupport;
+
+ private FlinkTablePropertiesBaseTable(
+ UnkeyedTable delegate, Map<String, String> flinkTableProperties) {
+ super(delegate.id(), delegate, delegate.io(), null);
+ this.delegate = delegate;
+ this.propertiesSupport = new
FlinkTablePropertiesSupport(flinkTableProperties);
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return propertiesSupport.withFlinkTableProperties(delegate.properties());
+ }
+
+ @Override
+ public void refresh() {
+ delegate.refresh();
+ }
+ }
+
+ private static class FlinkTablePropertiesChangeTable extends
BasicKeyedTable.ChangeInternalTable {
+ private static final long serialVersionUID = 1L;
+
+ private final ChangeTable delegate;
+ private final FlinkTablePropertiesSupport propertiesSupport;
+
+ private FlinkTablePropertiesChangeTable(
+ ChangeTable delegate, Map<String, String> flinkTableProperties) {
+ super(delegate.id(), delegate, delegate.io(), null);
+ this.delegate = delegate;
+ this.propertiesSupport = new
FlinkTablePropertiesSupport(flinkTableProperties);
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return propertiesSupport.withFlinkTableProperties(delegate.properties());
+ }
+
+ @Override
+ public void refresh() {
+ delegate.refresh();
+ }
+
+ @Override
+ public org.apache.amoro.scan.ChangeTableIncrementalScan newScan() {
+ return delegate.newScan();
+ }
+ }
}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedInputFormatOperatorFactory.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedInputFormatOperatorFactory.java
index 644ef1f80..ad0ce2269 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedInputFormatOperatorFactory.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedInputFormatOperatorFactory.java
@@ -19,7 +19,6 @@
package org.apache.amoro.flink.table;
import org.apache.amoro.flink.interceptor.ProxyFactory;
-import org.apache.amoro.flink.util.IcebergClassUtil;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
@@ -29,7 +28,6 @@ import
org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.source.FlinkInputFormat;
import org.apache.iceberg.flink.source.FlinkInputSplit;
-import org.apache.iceberg.flink.source.StreamingReaderOperator;
public class UnkeyedInputFormatOperatorFactory extends
AbstractStreamOperatorFactory<RowData>
implements YieldingOperatorFactory<RowData>,
@@ -52,8 +50,8 @@ public class UnkeyedInputFormatOperatorFactory extends
AbstractStreamOperatorFac
@Override
public <O extends StreamOperator<RowData>> O createStreamOperator(
StreamOperatorParameters<RowData> parameters) {
- StreamingReaderOperator operator =
- IcebergClassUtil.newStreamingReaderOperator(
+ UnkeyedStreamingReaderOperator operator =
+ new UnkeyedStreamingReaderOperator(
factory.getInstance(), processingTimeService, mailboxExecutor);
operator.setup(
parameters.getContainingTask(), parameters.getStreamConfig(),
parameters.getOutput());
@@ -62,6 +60,6 @@ public class UnkeyedInputFormatOperatorFactory extends
AbstractStreamOperatorFac
@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader
classLoader) {
- return StreamingReaderOperator.class;
+ return UnkeyedStreamingReaderOperator.class;
}
}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java
new file mode 100644
index 000000000..7d4310e35
--- /dev/null
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.flink.table;
+
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.JavaSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.flink.source.FlinkInputFormat;
+import org.apache.iceberg.flink.source.FlinkInputSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+
+/**
+ * Reader operator for Amoro's unkeyed Iceberg file source.
+ *
+ * <p>This mirrors Iceberg's non-public {@code StreamingReaderOperator}
behavior while allowing
+ * Amoro to create the {@code FlinkInputFormat} through {@link
+ * org.apache.amoro.flink.interceptor.ProxyFactory} at operator runtime. This
keeps the
+ * Kerberos-aware file IO wrapper without reflectively depending on Iceberg's
package-private
+ * operator factory or private constructor.
+ */
+public class UnkeyedStreamingReaderOperator extends
AbstractStreamOperator<RowData>
+ implements OneInputStreamOperator<FlinkInputSplit, RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(UnkeyedStreamingReaderOperator.class);
+
+ private final MailboxExecutor executor;
+ private FlinkInputFormat format;
+ private transient SourceFunction.SourceContext<RowData> sourceContext;
+ private transient ListState<FlinkInputSplit> inputSplitsState;
+ private transient Queue<FlinkInputSplit> splits;
+ private transient SplitState currentSplitState;
+
+ public UnkeyedStreamingReaderOperator(
+ FlinkInputFormat format, ProcessingTimeService timeService,
MailboxExecutor mailboxExecutor) {
+ this.format = Preconditions.checkNotNull(format, "The InputFormat should
not be null.");
+ this.processingTimeService = timeService;
+ this.executor =
+ Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor
should not be null.");
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ inputSplitsState =
+ context
+ .getOperatorStateStore()
+ .getListState(new ListStateDescriptor<>("splits", new
JavaSerializer<>()));
+ currentSplitState = SplitState.IDLE;
+ splits = Lists.newLinkedList();
+ if (context.isRestored()) {
+ int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
+ LOG.info("Restoring state for the {} (taskIdx: {}).",
getClass().getSimpleName(), taskIdx);
+ for (FlinkInputSplit split : inputSplitsState.get()) {
+ splits.add(split);
+ }
+ }
+
+ sourceContext =
+ StreamSourceContexts.getSourceContext(
+ getOperatorConfig().getTimeCharacteristic(),
+ getProcessingTimeService(),
+ new Object(),
+ output,
+
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
+ -1L,
+ true);
+ enqueueProcessSplits();
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ inputSplitsState.clear();
+ inputSplitsState.addAll(Lists.newArrayList(splits));
+ }
+
+ @Override
+ public void processElement(StreamRecord<FlinkInputSplit> element) {
+ splits.add(element.getValue());
+ enqueueProcessSplits();
+ }
+
+ private void enqueueProcessSplits() {
+ if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) {
+ currentSplitState = SplitState.RUNNING;
+ executor.execute(
+ (ThrowingRunnable<IOException>) this::processSplits,
getClass().getSimpleName());
+ }
+ }
+
+ private void processSplits() throws IOException {
+ FlinkInputSplit split = splits.poll();
+ if (split == null) {
+ currentSplitState = SplitState.IDLE;
+ return;
+ }
+
+ format.open(split);
+ try {
+ RowData next = null;
+ while (!format.reachedEnd()) {
+ next = format.nextRecord(next);
+ sourceContext.collect(next);
+ }
+ } finally {
+ currentSplitState = SplitState.IDLE;
+ format.close();
+ }
+ enqueueProcessSplits();
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) {}
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (format != null) {
+ format.close();
+ format.closeInputFormat();
+ format = null;
+ }
+ sourceContext = null;
+ }
+
+ @Override
+ public void finish() throws Exception {
+ super.finish();
+ output.close();
+ if (sourceContext != null) {
+ sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
+ sourceContext.close();
+ sourceContext = null;
+ }
+ }
+
+ private enum SplitState {
+ IDLE,
+ RUNNING
+ }
+}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/FlinkClassReflectionUtil.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/FlinkClassReflectionUtil.java
deleted file mode 100644
index 6b181b510..000000000
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/FlinkClassReflectionUtil.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.amoro.flink.util;
-
-import org.apache.flink.api.connector.source.ReaderOutput;
-import
org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-/** A util class to handle the reflection operation of Flink class. */
-public class FlinkClassReflectionUtil {
-
- public static final Logger LOG =
LoggerFactory.getLogger(FlinkClassReflectionUtil.class);
-
- public static Object getSplitLocalOutput(ReaderOutput readerOutput) {
- if (readerOutput == null) {
- return null;
- }
- try {
- return ReflectionUtil.getField(
- (Class<ReaderOutput>)
ProgressiveTimestampsAndWatermarks.class.getDeclaredClasses()[2],
- readerOutput,
- "splitLocalOutputs");
- } catch (Exception e) {
- LOG.warn("extract internal watermark error", e);
- }
- return null;
- }
-
- public static void emitPeriodWatermark(@Nullable Object splitLocalOutput) {
- if (splitLocalOutput == null) {
- return;
- }
- try {
- Method method =
-
ProgressiveTimestampsAndWatermarks.class.getDeclaredClasses()[1].getDeclaredMethod(
- "emitPeriodicWatermark");
- method.setAccessible(true);
- method.invoke(splitLocalOutput);
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
- LOG.warn("no method found", e);
- }
- }
-}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java
index adfd4383b..76a4555e6 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/util/IcebergClassUtil.java
@@ -20,31 +20,31 @@ package org.apache.amoro.flink.util;
import org.apache.amoro.flink.interceptor.ProxyFactory;
import org.apache.amoro.io.AuthenticatedFileIO;
-import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkWriteResult;
import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.flink.source.FlinkInputFormat;
import org.apache.iceberg.flink.source.ScanContext;
-import org.apache.iceberg.flink.source.StreamingReaderOperator;
+import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.util.ThreadPools;
import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.List;
@@ -52,8 +52,6 @@ import java.util.Map;
/** An util generates Apache Iceberg writer and committer operator w */
public class IcebergClassUtil {
- private static final String ICEBERG_SCAN_CONTEXT_CLASS =
- "org.apache.iceberg.flink.source.ScanContext";
private static final String ICEBERG_PARTITION_SELECTOR_CLASS =
"org.apache.iceberg.flink.sink.PartitionKeySelector";
private static final String ICEBERG_FILE_COMMITTER_CLASS =
@@ -66,7 +64,6 @@ public class IcebergClassUtil {
try {
Class<?> clazz = forName(ICEBERG_PARTITION_SELECTOR_CLASS);
Constructor<?> c = clazz.getConstructor(PartitionSpec.class,
Schema.class, RowType.class);
- c.setAccessible(true);
return (KeySelector<RowData, Object>) c.newInstance(spec, schema,
flinkSchema);
} catch (NoSuchMethodException
| IllegalAccessException
@@ -129,51 +126,22 @@ public class IcebergClassUtil {
new Object[] {fullTableName, taskWriterFactory});
}
- public static StreamingReaderOperator newStreamingReaderOperator(
- FlinkInputFormat format, ProcessingTimeService timeService,
MailboxExecutor mailboxExecutor) {
- try {
- Constructor<StreamingReaderOperator> c =
- StreamingReaderOperator.class.getDeclaredConstructor(
- FlinkInputFormat.class, ProcessingTimeService.class,
MailboxExecutor.class);
- c.setAccessible(true);
- return c.newInstance(format, timeService, mailboxExecutor);
- } catch (IllegalAccessException
- | NoSuchMethodException
- | InvocationTargetException
- | InstantiationException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static FlinkInputFormat getInputFormat(OneInputStreamOperatorFactory
operatorFactory) {
- try {
- Class<?>[] classes = StreamingReaderOperator.class.getDeclaredClasses();
- Class<?> clazz = null;
- for (Class<?> c : classes) {
- if ("OperatorFactory".equals(c.getSimpleName())) {
- clazz = c;
- break;
- }
- }
- Field field = clazz.getDeclaredField("format");
- field.setAccessible(true);
- return (FlinkInputFormat) (field.get(operatorFactory));
- } catch (IllegalAccessException | NoSuchFieldException e) {
- throw new RuntimeException(e);
- }
- }
-
public static ProxyFactory<FlinkInputFormat> getInputFormatProxyFactory(
- OneInputStreamOperatorFactory operatorFactory,
+ TableLoader tableLoader,
+ Table table,
AuthenticatedFileIO authenticatedFileIO,
- Schema tableSchema) {
- FlinkInputFormat inputFormat = getInputFormat(operatorFactory);
- TableLoader tableLoader =
- ReflectionUtil.getField(FlinkInputFormat.class, inputFormat,
"tableLoader");
- FileIO io = ReflectionUtil.getField(FlinkInputFormat.class, inputFormat,
"io");
- EncryptionManager encryption =
- ReflectionUtil.getField(FlinkInputFormat.class, inputFormat,
"encryption");
- Object context = ReflectionUtil.getField(FlinkInputFormat.class,
inputFormat, "context");
+ Schema tableSchema,
+ Schema projectedSchema,
+ ReadableConfig flinkConf,
+ Map<String, String> properties,
+ List<Expression> filters,
+ long limit,
+ Long startSnapshotId) {
+ FileIO io = table.io();
+ EncryptionManager encryption = table.encryption();
+ ScanContext context =
+ buildScanContext(
+ table, projectedSchema, flinkConf, properties, filters, limit,
startSnapshotId);
return ProxyUtil.getProxyFactory(
FlinkInputFormat.class,
@@ -184,6 +152,39 @@ public class IcebergClassUtil {
new Object[] {tableLoader, tableSchema, io, encryption, context});
}
+ private static ScanContext buildScanContext(
+ Table table,
+ Schema projectedSchema,
+ ReadableConfig flinkConf,
+ Map<String, String> properties,
+ List<Expression> filters,
+ long limit,
+ Long startSnapshotId) {
+ ScanContext.Builder contextBuilder =
+ ScanContext.builder().resolveConfig(table, properties, flinkConf);
+ contextBuilder.exposeLocality(isLocalityEnabled(table, flinkConf));
+ if (projectedSchema != null) {
+ contextBuilder.project(projectedSchema);
+ }
+ if (filters != null) {
+ contextBuilder.filters(filters);
+ }
+ contextBuilder.limit(limit);
+ if (startSnapshotId != null) {
+ contextBuilder.startSnapshotId(startSnapshotId);
+ }
+ return contextBuilder.build();
+ }
+
+ private static boolean isLocalityEnabled(Table table, ReadableConfig
flinkConf) {
+ Boolean localityEnabled =
+
flinkConf.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
+ if (localityEnabled != null && !localityEnabled) {
+ return false;
+ }
+ return Util.mayHaveBlockLocations(table.io(), table.location());
+ }
+
private static Class<?> forName(String className) {
try {
return Class.forName(className);
@@ -193,22 +194,10 @@ public class IcebergClassUtil {
}
public static SourceFunction getSourceFunction(AbstractUdfStreamOperator
source) {
- try {
- Field field =
AbstractUdfStreamOperator.class.getDeclaredField("userFunction");
- field.setAccessible(true);
- return (SourceFunction) (field.get(source));
- } catch (IllegalAccessException | NoSuchFieldException e) {
- throw new RuntimeException(e);
- }
+ return (SourceFunction) source.getUserFunction();
}
public static void clean(StreamExecutionEnvironment env) {
- try {
- Field field =
StreamExecutionEnvironment.class.getDeclaredField("transformations");
- field.setAccessible(true);
- ((List) (field.get(env))).clear();
- } catch (IllegalAccessException | NoSuchFieldException e) {
- throw new RuntimeException(e);
- }
+ env.getTransformations().clear();
}
}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java
index 192e95801..31aca4cd2 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/write/FlinkSink.java
@@ -43,7 +43,6 @@ import
org.apache.amoro.flink.table.descriptors.MixedFormatValidator;
import org.apache.amoro.flink.util.CompatibleFlinkPropertyUtil;
import org.apache.amoro.flink.util.IcebergClassUtil;
import org.apache.amoro.flink.util.MixedFormatUtils;
-import org.apache.amoro.flink.util.ProxyUtil;
import org.apache.amoro.table.DistributionHashMode;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.TableProperties;
@@ -435,10 +434,7 @@ public class FlinkSink {
return null;
}
tableLoader.switchLoadInternalTableForKeyedTable(MixedFormatUtils.isToBase(overwrite));
- return (OneInputStreamOperator)
- ProxyUtil.getProxy(
- IcebergClassUtil.newIcebergFilesCommitter(
- tableLoader, overwrite, branch, spec, mixedTable.io()),
- mixedTable.io());
+ return IcebergClassUtil.newIcebergFilesCommitter(
+ tableLoader, overwrite, branch, spec, mixedTable.io());
}
}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestWatermark.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestWatermark.java
index ae0dbe8c7..35b789bb8 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestWatermark.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/table/TestWatermark.java
@@ -30,16 +30,8 @@ import org.apache.amoro.flink.util.MixedFormatUtils;
import org.apache.amoro.flink.util.TestUtil;
import org.apache.amoro.table.KeyedTable;
import org.apache.amoro.table.TableIdentifier;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
@@ -57,8 +49,6 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
@@ -68,12 +58,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
public class TestWatermark extends FlinkTestBase {
- public static final Logger LOG =
LoggerFactory.getLogger(TestWatermark.class);
-
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
private static final String DB = TableTestHelper.TEST_TABLE_ID.getDatabase();
@@ -147,17 +133,17 @@ public class TestWatermark extends FlinkTestBase {
"create table d (tt as cast(op_time as timestamp(3)), watermark for tt
as tt) like %s",
table);
- Table source = getTableEnv().sqlQuery("select is_true from d");
-
- WatermarkTestOperator op = new WatermarkTestOperator();
- getTableEnv()
- .toRetractStream(source, RowData.class)
- .transform("test watermark", TypeInformation.of(RowData.class), op);
- getEnv().executeAsync("test watermark");
-
- op.waitWatermark();
-
- Assert.assertTrue(op.watermark > Long.MIN_VALUE);
+ // This query verifies that a table with watermark definition can still be
consumed
+ // correctly. We intentionally avoid waiting on an async watermark
callback here because
+ // that path depends on internal source/operator timing and can hang the
test without
+ // revealing a user-visible regression.
+ TableResult result = exec("select is_true from d");
+ CommonTestUtils.waitUntilJobManagerIsInitialized(
+ () -> result.getJobClient().get().getJobStatus().get());
+ try (CloseableIterator<Row> iterator = result.collect()) {
+ Assert.assertEquals(Row.of(true), iterator.next());
+ }
+ result.getJobClient().ifPresent(TestUtil::cancelJob);
}
@Test
@@ -226,34 +212,4 @@ public class TestWatermark extends FlinkTestBase {
expected.add(new Object[] {true,
LocalDateTime.parse("2022-06-17T10:08:11")});
Assert.assertEquals(DataUtil.toRowSet(expected), actual);
}
-
- public static class WatermarkTestOperator extends
AbstractStreamOperator<RowData>
- implements OneInputStreamOperator<Tuple2<Boolean, RowData>, RowData> {
-
- private static final long serialVersionUID = 1L;
- public long watermark;
- private static final CompletableFuture<Void> waitWatermark = new
CompletableFuture<>();
-
- public WatermarkTestOperator() {
- super();
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- private void waitWatermark() throws InterruptedException,
ExecutionException {
- waitWatermark.get();
- }
-
- @Override
- public void processElement(StreamRecord<Tuple2<Boolean, RowData>> element)
throws Exception {
- output.collect(element.asRecord());
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- LOG.info("processWatermark: {}", mark);
- watermark = mark.getTimestamp();
- waitWatermark.complete(null);
- super.processWatermark(mark);
- }
- }
}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/util/TestIcebergClassUtil.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/util/TestIcebergClassUtil.java
new file mode 100644
index 000000000..0e654f2a0
--- /dev/null
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/util/TestIcebergClassUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.flink.util;
+
+import org.apache.amoro.io.AuthenticatedFileIO;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.flink.sink.FlinkWriteResult;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestIcebergClassUtil {
+
+ @Test
+ public void testIcebergFilesCommitterRunsThroughAuthenticatedFileIO() {
+ CountingAuthenticatedFileIO fileIO = new CountingAuthenticatedFileIO();
+ OneInputStreamOperator<FlinkWriteResult, Void> committer =
+ IcebergClassUtil.newIcebergFilesCommitter(
+ null, false, null, PartitionSpec.unpartitioned(), fileIO);
+
+ committer.toString();
+
+ Assert.assertEquals(1, fileIO.doAsCalls.get());
+ }
+
+ private static class CountingAuthenticatedFileIO implements
AuthenticatedFileIO {
+ private final AtomicInteger doAsCalls = new AtomicInteger();
+
+ @Override
+ public <T> T doAs(Callable<T> callable) {
+ doAsCalls.incrementAndGet();
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public InputFile newInputFile(String path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAutomaticLogWriter.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAutomaticLogWriter.java
index b71cf8496..eb5722a51 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAutomaticLogWriter.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestAutomaticLogWriter.java
@@ -80,6 +80,7 @@ import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@@ -172,26 +173,27 @@ public class TestAutomaticLogWriter extends FlinkTestBase
{
LocalDateTime.parse("2022-06-18 10:10:11", dtf)
});
List<Object[]> catchUpExpects = new LinkedList<>();
+ LocalDateTime catchUpBaseTime =
LocalDateTime.now().truncatedTo(ChronoUnit.MICROS);
catchUpExpects.add(
new Object[] {
1000014,
"d",
- LocalDateTime.now().minusSeconds(3).toEpochSecond(ZoneOffset.UTC),
- LocalDateTime.now().minusSeconds(3)
+ catchUpBaseTime.minusSeconds(3).toEpochSecond(ZoneOffset.UTC),
+ catchUpBaseTime.minusSeconds(3)
});
catchUpExpects.add(
new Object[] {
1000021,
"d",
- LocalDateTime.now().minusSeconds(2).toEpochSecond(ZoneOffset.UTC),
- LocalDateTime.now().minusSeconds(2)
+ catchUpBaseTime.minusSeconds(2).toEpochSecond(ZoneOffset.UTC),
+ catchUpBaseTime.minusSeconds(2)
});
catchUpExpects.add(
new Object[] {
1000015,
"e",
- LocalDateTime.now().minusSeconds(1).toEpochSecond(ZoneOffset.UTC),
- LocalDateTime.now().minusSeconds(1)
+ catchUpBaseTime.minusSeconds(1).toEpochSecond(ZoneOffset.UTC),
+ catchUpBaseTime.minusSeconds(1)
});
expects.addAll(catchUpExpects);
diff --git
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java
index 7cf7a99eb..d7f543029 100644
---
a/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java
+++
b/amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/test/java/org/apache/amoro/flink/write/TestMixedFormatFileWriter.java
@@ -100,9 +100,12 @@ public class TestMixedFormatFileWriter extends
FlinkTestBase {
boolean submitEmptySnapshots,
Long restoredCheckpointId)
throws Exception {
- tableLoader.open();
- MixedTable mixedTable = tableLoader.loadMixedFormatTable();
- mixedTable.properties().put(SUBMIT_EMPTY_SNAPSHOTS.key(),
String.valueOf(submitEmptySnapshots));
+ HashMap<String, String> extraProperties = new HashMap<>();
+ extraProperties.put(SUBMIT_EMPTY_SNAPSHOTS.key(),
String.valueOf(submitEmptySnapshots));
+ MixedFormatTableLoader writerTableLoader =
+ tableLoader.copyWithFlinkTableProperties(extraProperties);
+ writerTableLoader.open();
+ MixedTable mixedTable = writerTableLoader.loadMixedFormatTable();
MixedFormatFileWriter streamWriter =
FlinkSink.createFileWriter(
@@ -110,7 +113,7 @@ public class TestMixedFormatFileWriter extends
FlinkTestBase {
null,
false,
(RowType) FLINK_SCHEMA.toRowDataType().getLogicalType(),
- tableLoader);
+ writerTableLoader);
TestOneInputStreamOperatorIntern<RowData, FlinkWriteResult> harness =
new TestOneInputStreamOperatorIntern<>(
streamWriter, 1, 1, 0, restoredCheckpointId, new
TestGlobalAggregateManager());
diff --git a/amoro-format-mixed/amoro-mixed-trino/pom.xml
b/amoro-format-mixed/amoro-mixed-trino/pom.xml
index 202d74a0b..a91f845fe 100644
--- a/amoro-format-mixed/amoro-mixed-trino/pom.xml
+++ b/amoro-format-mixed/amoro-mixed-trino/pom.xml
@@ -653,7 +653,6 @@
<toolchains>
<jdk>
<version>17</version>
- <vendor>sun</vendor>
</jdk>
</toolchains>
</configuration>
diff --git a/amoro-format-paimon/pom.xml b/amoro-format-paimon/pom.xml
index 87b29c428..ff98e9559 100644
--- a/amoro-format-paimon/pom.xml
+++ b/amoro-format-paimon/pom.xml
@@ -78,4 +78,13 @@
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
diff --git a/pom.xml b/pom.xml
index 1b54c24bc..78659d508 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
<commons-pool2.version>2.12.0</commons-pool2.version>
<commons-lang3.version>3.20.0</commons-lang3.version>
<commons-net.version>3.12.0</commons-net.version>
- <cglib.version>2.2.2</cglib.version>
+ <cglib.version>3.3.0</cglib.version>
<curator.version>5.7.0</curator.version>
<mockito.version>4.11.0</mockito.version>
<testcontainers.version>1.21.4</testcontainers.version>
@@ -171,6 +171,26 @@
<rocksdb-dependency-scope>compile</rocksdb-dependency-scope>
<lucene-dependency-scope>compile</lucene-dependency-scope>
<aliyun-sdk-dependency-scope>provided</aliyun-sdk-dependency-scope>
+
+ <jvm.module.opens>--add-opens=java.base/java.lang=ALL-UNNAMED
+ --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
+ --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+ --add-opens=java.base/java.io=ALL-UNNAMED
+ --add-opens=java.base/java.net=ALL-UNNAMED
+ --add-opens=java.base/java.nio=ALL-UNNAMED
+ --add-opens=java.base/java.util=ALL-UNNAMED
+ --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
+ --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
+ --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
+ --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
+ --add-opens=java.base/sun.security.action=ALL-UNNAMED
+
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED</jvm.module.opens>
+
+ <jvm.module.exports>--add-exports=java.base/sun.net.util=ALL-UNNAMED
+ --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED
+
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED</jvm.module.exports>
+
+ <amoro.surefire.baseArgLine>-XX:+UseG1GC -Xms256m
-XX:+IgnoreUnrecognizedVMOptions ${jvm.module.opens}
${jvm.module.exports}</amoro.surefire.baseArgLine>
</properties>
<dependencyManagement>
@@ -1491,6 +1511,7 @@
<version>${maven-surefire-plugin.version}</version>
<configuration>
<failIfNoSpecifiedTests>false</failIfNoSpecifiedTests>
+ <argLine>${amoro.surefire.baseArgLine}</argLine>
</configuration>
</plugin>
<plugin>
@@ -1864,12 +1885,52 @@
<profile>
<id>java11</id>
<activation>
- <jdk>[11,)</jdk>
+ <jdk>[11,17)</jdk>
</activation>
<properties>
<java.target.version>11</java.target.version>
</properties>
</profile>
+ <profile>
+ <id>java17</id>
+ <activation>
+ <jdk>17</jdk>
+ </activation>
+ <properties>
+ <java.source.version>17</java.source.version>
+ <java.target.version>17</java.target.version>
+ </properties>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <additionalJOptions>
+
<additionalJOption>--add-exports=java.base/sun.net.util=ALL-UNNAMED</additionalJOption>
+
<additionalJOption>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</additionalJOption>
+
<additionalJOption>--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED</additionalJOption>
+ </additionalJOptions>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>17</source>
+ <target>17</target>
+ <compilerArgs combine.children="append">
+
<arg>--add-exports=java.base/sun.net.util=ALL-UNNAMED</arg>
+
<arg>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</arg>
+
<arg>--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+ </profile>
<profile>
<id>spark-3.3</id>
<properties>