This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e4148ab89 [flink] Avoids miscellaneous deprecated APIs for 2.0 
Preview (#4590)
7e4148ab89 is described below

commit 7e4148ab890f1e62b448447216a9b9558bd025bc
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Wed Nov 27 17:46:13 2024 +0800

    [flink] Avoids miscellaneous deprecated APIs for 2.0 Preview (#4590)
---
 .../java/org/apache/paimon/flink/FlinkFileIO.java  | 24 +++++++++++++++++++--
 .../paimon/flink/source/FileStoreSourceReader.java | 23 --------------------
 .../flink/source/align/AlignedSourceReader.java    |  2 +-
 .../apache/paimon/flink/CatalogTableITCase.java    | 25 ++++++++++++----------
 .../paimon/flink/source/IteratorSourcesITCase.java |  4 ++--
 .../flink/util/MiniClusterWithClientExtension.java |  6 ------
 .../paimon/flink/util/ReadWriteTableTestUtil.java  |  6 +++---
 7 files changed, 42 insertions(+), 48 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
index 74512409bf..617d25125f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
@@ -29,10 +29,10 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.core.fs.FileSystemKind;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Locale;
 
 /** Flink {@link FileIO} to use {@link FileSystem}. */
 public class FlinkFileIO implements FileIO {
@@ -48,7 +48,27 @@ public class FlinkFileIO implements FileIO {
     @Override
     public boolean isObjectStore() {
         try {
-            return path.getFileSystem().getKind() != 
FileSystemKind.FILE_SYSTEM;
+            FileSystem fs = path.getFileSystem();
+            String scheme = fs.getUri().getScheme().toLowerCase(Locale.US);
+
+            if (scheme.startsWith("s3")
+                    || scheme.startsWith("emr")
+                    || scheme.startsWith("oss")
+                    || scheme.startsWith("wasb")
+                    || scheme.startsWith("gs")) {
+                // the Amazon S3 storage or Aliyun OSS storage or Azure Blob 
Storage
+                // or Google Cloud Storage
+                return true;
+            } else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
+                // file servers instead of file systems
+                // they might actually be consistent, but we have no hard 
guarantees
+                // currently to rely on that
+                return true;
+            } else {
+                // the remainder should include hdfs, kosmos, ceph, ...
+                // this also includes federated HDFS (viewfs).
+                return false;
+            }
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 92adf5e049..8fc78c868b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -25,9 +25,7 @@ import org.apache.paimon.table.source.TableRead;
 
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator;
 import org.apache.flink.table.data.RowData;
 
@@ -64,27 +62,6 @@ public class FileStoreSourceReader
         this.ioManager = ioManager;
     }
 
-    public FileStoreSourceReader(
-            SourceReaderContext readerContext,
-            TableRead tableRead,
-            FileStoreSourceReaderMetrics metrics,
-            IOManager ioManager,
-            @Nullable Long limit,
-            
FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordIterator<RowData>>>
-                    elementsQueue) {
-        super(
-                elementsQueue,
-                () ->
-                        new FileStoreSourceSplitReader(
-                                tableRead, RecordLimiter.create(limit), 
metrics),
-                (element, output, state) ->
-                        FlinkRecordsWithSplitIds.emitRecord(
-                                readerContext, element, output, state, 
metrics),
-                readerContext.getConfiguration(),
-                readerContext);
-        this.ioManager = ioManager;
-    }
-
     @Override
     public void start() {
         // we request a split only if we did not get splits during the 
checkpoint restore
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
index 1f0bbca314..a8ffe3de56 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
@@ -58,7 +58,7 @@ public class AlignedSourceReader extends FileStoreSourceReader
             @Nullable Long limit,
             
FutureCompletingBlockingQueue<RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
                     elementsQueue) {
-        super(readerContext, tableRead, metrics, ioManager, limit, 
elementsQueue);
+        super(readerContext, tableRead, metrics, ioManager, limit);
         this.elementsQueue = elementsQueue;
         this.nextCheckpointId = null;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 2a855796d8..96334de3f8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -251,17 +251,20 @@ public class CatalogTableITCase extends CatalogITCaseBase 
{
         sql("ALTER TABLE T SET ('snapshot.num-retained.min' = '18')");
         sql("ALTER TABLE T SET ('manifest.format' = 'avro')");
 
-        assertThat(sql("SHOW CREATE TABLE T$schemas").toString())
-                .isEqualTo(
-                        "[+I[CREATE TABLE `PAIMON`.`default`.`T$schemas` (\n"
-                                + "  `schema_id` BIGINT NOT NULL,\n"
-                                + "  `fields` VARCHAR(2147483647) NOT NULL,\n"
-                                + "  `partition_keys` VARCHAR(2147483647) NOT 
NULL,\n"
-                                + "  `primary_keys` VARCHAR(2147483647) NOT 
NULL,\n"
-                                + "  `options` VARCHAR(2147483647) NOT NULL,\n"
-                                + "  `comment` VARCHAR(2147483647),\n"
-                                + "  `update_time` TIMESTAMP(3) NOT NULL\n"
-                                + ") ]]");
+        String actualResult = sql("SHOW CREATE TABLE T$schemas").toString();
+        String expectedResult =
+                "[+I[CREATE TABLE `PAIMON`.`default`.`T$schemas` (\n"
+                        + "  `schema_id` BIGINT NOT NULL,\n"
+                        + "  `fields` VARCHAR(2147483647) NOT NULL,\n"
+                        + "  `partition_keys` VARCHAR(2147483647) NOT NULL,\n"
+                        + "  `primary_keys` VARCHAR(2147483647) NOT NULL,\n"
+                        + "  `options` VARCHAR(2147483647) NOT NULL,\n"
+                        + "  `comment` VARCHAR(2147483647),\n"
+                        + "  `update_time` TIMESTAMP(3) NOT NULL\n"
+                        + ") ]]";
+        actualResult = actualResult.replace(" ", "").replace("\n", "");
+        expectedResult = expectedResult.replace(" ", "").replace("\n", "");
+        assertThat(actualResult).isEqualTo(expectedResult);
 
         List<Row> result =
                 sql(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
index 8404d994fa..0c5d485af7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
@@ -18,10 +18,10 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.commons.collections.IteratorUtils;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -67,7 +67,7 @@ public class IteratorSourcesITCase extends TestLogger {
                         "iterator source");
 
         final List<RowData> result =
-                DataStreamUtils.collectBoundedStream(stream, "Iterator Source 
Test");
+                IteratorUtils.toList(stream.executeAndCollect("Iterator Source 
Test"));
 
         verifySequence(result, 1L, 1_000L);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/MiniClusterWithClientExtension.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/MiniClusterWithClientExtension.java
index cfc23a0a44..39939f7867 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/MiniClusterWithClientExtension.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/MiniClusterWithClientExtension.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.junit5.InjectClusterClient;
-import org.apache.flink.test.util.TestEnvironment;
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
@@ -167,17 +166,12 @@ public final class MiniClusterWithClientExtension
                         .getOptional(CoreOptions.DEFAULT_PARALLELISM)
                         .orElse(internalMiniClusterExtension.getNumberSlots());
 
-        TestEnvironment executionEnvironment =
-                new TestEnvironment(
-                        internalMiniClusterExtension.getMiniCluster(), 
defaultParallelism, false);
-        executionEnvironment.setAsContext();
         TestStreamEnvironment.setAsContext(
                 internalMiniClusterExtension.getMiniCluster(), 
defaultParallelism);
     }
 
     private void unregisterEnv(InternalMiniClusterExtension 
internalMiniClusterExtension) {
         TestStreamEnvironment.unsetAsContext();
-        TestEnvironment.unsetAsContext();
     }
 
     private MiniClusterClient createMiniClusterClient(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 9c3170f9a9..0eac2ed293 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.ReadWriteTableITCase;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -37,6 +36,7 @@ import org.apache.flink.util.CloseableIterator;
 import javax.annotation.Nullable;
 
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -53,7 +53,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Test util for {@link ReadWriteTableITCase}. */
 public class ReadWriteTableTestUtil {
 
-    private static final Time TIME_OUT = Time.seconds(10);
+    private static final Duration TIME_OUT = Duration.ofSeconds(10);
 
     public static final int DEFAULT_PARALLELISM = 2;
 
@@ -278,7 +278,7 @@ public class ReadWriteTableTestUtil {
         try (BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(resultItr)) {
             if (!expected.isEmpty()) {
                 List<Row> result =
-                        iterator.collect(expected.size(), TIME_OUT.getSize(), 
TIME_OUT.getUnit());
+                        iterator.collect(expected.size(), 
TIME_OUT.getSeconds(), TimeUnit.SECONDS);
                 assertThat(toInsertOnlyRows(result))
                         
.containsExactlyInAnyOrderElementsOf(toInsertOnlyRows(expected));
             }

Reply via email to