This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7e4148ab89 [flink] Avoids miscellaneous deprecated APIs for 2.0
Preview (#4590)
7e4148ab89 is described below
commit 7e4148ab890f1e62b448447216a9b9558bd025bc
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Wed Nov 27 17:46:13 2024 +0800
[flink] Avoids miscellaneous deprecated APIs for 2.0 Preview (#4590)
---
.../java/org/apache/paimon/flink/FlinkFileIO.java | 24 +++++++++++++++++++--
.../paimon/flink/source/FileStoreSourceReader.java | 23 --------------------
.../flink/source/align/AlignedSourceReader.java | 2 +-
.../apache/paimon/flink/CatalogTableITCase.java | 25 ++++++++++++----------
.../paimon/flink/source/IteratorSourcesITCase.java | 4 ++--
.../flink/util/MiniClusterWithClientExtension.java | 6 ------
.../paimon/flink/util/ReadWriteTableTestUtil.java | 6 +++---
7 files changed, 42 insertions(+), 48 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
index 74512409bf..617d25125f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
@@ -29,10 +29,10 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.core.fs.FileSystemKind;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Locale;
/** Flink {@link FileIO} to use {@link FileSystem}. */
public class FlinkFileIO implements FileIO {
@@ -48,7 +48,27 @@ public class FlinkFileIO implements FileIO {
@Override
public boolean isObjectStore() {
try {
- return path.getFileSystem().getKind() !=
FileSystemKind.FILE_SYSTEM;
+ FileSystem fs = path.getFileSystem();
+ String scheme = fs.getUri().getScheme().toLowerCase(Locale.US);
+
+ if (scheme.startsWith("s3")
+ || scheme.startsWith("emr")
+ || scheme.startsWith("oss")
+ || scheme.startsWith("wasb")
+ || scheme.startsWith("gs")) {
+ // the Amazon S3 storage or Aliyun OSS storage or Azure Blob
Storage
+ // or Google Cloud Storage
+ return true;
+ } else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
+ // file servers instead of file systems
+ // they might actually be consistent, but we have no hard
guarantees
+ // currently to rely on that
+ return true;
+ } else {
+ // the remainder should include hdfs, kosmos, ceph, ...
+ // this also includes federated HDFS (viewfs).
+ return false;
+ }
} catch (IOException e) {
throw new UncheckedIOException(e);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 92adf5e049..8fc78c868b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -25,9 +25,7 @@ import org.apache.paimon.table.source.TableRead;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
-import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator;
import org.apache.flink.table.data.RowData;
@@ -64,27 +62,6 @@ public class FileStoreSourceReader
this.ioManager = ioManager;
}
- public FileStoreSourceReader(
- SourceReaderContext readerContext,
- TableRead tableRead,
- FileStoreSourceReaderMetrics metrics,
- IOManager ioManager,
- @Nullable Long limit,
-
FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordIterator<RowData>>>
- elementsQueue) {
- super(
- elementsQueue,
- () ->
- new FileStoreSourceSplitReader(
- tableRead, RecordLimiter.create(limit),
metrics),
- (element, output, state) ->
- FlinkRecordsWithSplitIds.emitRecord(
- readerContext, element, output, state,
metrics),
- readerContext.getConfiguration(),
- readerContext);
- this.ioManager = ioManager;
- }
-
@Override
public void start() {
// we request a split only if we did not get splits during the
checkpoint restore
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
index 1f0bbca314..a8ffe3de56 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
@@ -58,7 +58,7 @@ public class AlignedSourceReader extends FileStoreSourceReader
@Nullable Long limit,
FutureCompletingBlockingQueue<RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
elementsQueue) {
- super(readerContext, tableRead, metrics, ioManager, limit,
elementsQueue);
+ super(readerContext, tableRead, metrics, ioManager, limit);
this.elementsQueue = elementsQueue;
this.nextCheckpointId = null;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 2a855796d8..96334de3f8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -251,17 +251,20 @@ public class CatalogTableITCase extends CatalogITCaseBase
{
sql("ALTER TABLE T SET ('snapshot.num-retained.min' = '18')");
sql("ALTER TABLE T SET ('manifest.format' = 'avro')");
- assertThat(sql("SHOW CREATE TABLE T$schemas").toString())
- .isEqualTo(
- "[+I[CREATE TABLE `PAIMON`.`default`.`T$schemas` (\n"
- + " `schema_id` BIGINT NOT NULL,\n"
- + " `fields` VARCHAR(2147483647) NOT NULL,\n"
- + " `partition_keys` VARCHAR(2147483647) NOT
NULL,\n"
- + " `primary_keys` VARCHAR(2147483647) NOT
NULL,\n"
- + " `options` VARCHAR(2147483647) NOT NULL,\n"
- + " `comment` VARCHAR(2147483647),\n"
- + " `update_time` TIMESTAMP(3) NOT NULL\n"
- + ") ]]");
+ String actualResult = sql("SHOW CREATE TABLE T$schemas").toString();
+ String expectedResult =
+ "[+I[CREATE TABLE `PAIMON`.`default`.`T$schemas` (\n"
+ + " `schema_id` BIGINT NOT NULL,\n"
+ + " `fields` VARCHAR(2147483647) NOT NULL,\n"
+ + " `partition_keys` VARCHAR(2147483647) NOT NULL,\n"
+ + " `primary_keys` VARCHAR(2147483647) NOT NULL,\n"
+ + " `options` VARCHAR(2147483647) NOT NULL,\n"
+ + " `comment` VARCHAR(2147483647),\n"
+ + " `update_time` TIMESTAMP(3) NOT NULL\n"
+ + ") ]]";
+ actualResult = actualResult.replace(" ", "").replace("\n", "");
+ expectedResult = expectedResult.replace(" ", "").replace("\n", "");
+ assertThat(actualResult).isEqualTo(expectedResult);
List<Row> result =
sql(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
index 8404d994fa..0c5d485af7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
@@ -18,10 +18,10 @@
package org.apache.paimon.flink.source;
+import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -67,7 +67,7 @@ public class IteratorSourcesITCase extends TestLogger {
"iterator source");
final List<RowData> result =
- DataStreamUtils.collectBoundedStream(stream, "Iterator Source
Test");
+ IteratorUtils.toList(stream.executeAndCollect("Iterator Source
Test"));
verifySequence(result, 1L, 1_000L);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/MiniClusterWithClientExtension.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/MiniClusterWithClientExtension.java
index cfc23a0a44..39939f7867 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/MiniClusterWithClientExtension.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/MiniClusterWithClientExtension.java
@@ -29,7 +29,6 @@ import
org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.junit5.InjectClusterClient;
-import org.apache.flink.test.util.TestEnvironment;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
@@ -167,17 +166,12 @@ public final class MiniClusterWithClientExtension
.getOptional(CoreOptions.DEFAULT_PARALLELISM)
.orElse(internalMiniClusterExtension.getNumberSlots());
- TestEnvironment executionEnvironment =
- new TestEnvironment(
- internalMiniClusterExtension.getMiniCluster(),
defaultParallelism, false);
- executionEnvironment.setAsContext();
TestStreamEnvironment.setAsContext(
internalMiniClusterExtension.getMiniCluster(),
defaultParallelism);
}
private void unregisterEnv(InternalMiniClusterExtension
internalMiniClusterExtension) {
TestStreamEnvironment.unsetAsContext();
- TestEnvironment.unsetAsContext();
}
private MiniClusterClient createMiniClusterClient(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 9c3170f9a9..0eac2ed293 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.ReadWriteTableITCase;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -37,6 +36,7 @@ import org.apache.flink.util.CloseableIterator;
import javax.annotation.Nullable;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -53,7 +53,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Test util for {@link ReadWriteTableITCase}. */
public class ReadWriteTableTestUtil {
- private static final Time TIME_OUT = Time.seconds(10);
+ private static final Duration TIME_OUT = Duration.ofSeconds(10);
public static final int DEFAULT_PARALLELISM = 2;
@@ -278,7 +278,7 @@ public class ReadWriteTableTestUtil {
try (BlockingIterator<Row, Row> iterator =
BlockingIterator.of(resultItr)) {
if (!expected.isEmpty()) {
List<Row> result =
- iterator.collect(expected.size(), TIME_OUT.getSize(),
TIME_OUT.getUnit());
+ iterator.collect(expected.size(),
TIME_OUT.getSeconds(), TimeUnit.SECONDS);
assertThat(toInsertOnlyRows(result))
.containsExactlyInAnyOrderElementsOf(toInsertOnlyRows(expected));
}