This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b0699581 [core] Refactor DataSplit field order
9b0699581 is described below

commit 9b0699581d6247ae4787be1a536c29c95df0bf5f
Author: Jingsong <[email protected]>
AuthorDate: Fri Apr 26 13:10:14 2024 +0800

    [core] Refactor DataSplit field order
---
 .../java/org/apache/paimon/io/DataFileMeta.java    | 10 +--
 .../org/apache/paimon/table/source/DataSplit.java  | 87 ++++++++++------------
 .../snapshot/IncrementalStartingScanner.java       |  2 +-
 .../flink/source/FileStoreSourceSplitReader.java   |  2 +-
 .../paimon/flink/source/operator/ReadOperator.java |  2 +-
 .../scala/org/apache/paimon/spark/ScanHelper.scala |  2 +-
 6 files changed, 49 insertions(+), 56 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index c5e7c627f..dc7375d6c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -287,14 +287,12 @@ public class DataFileMeta {
                 .toEpochMilli();
     }
 
-    public Optional<CoreOptions.FileFormatType> fileFormat() {
+    public String fileFormat() {
         String[] split = fileName.split("\\.");
-        try {
-            return Optional.of(
-                    CoreOptions.FileFormatType.valueOf(split[split.length - 
1].toUpperCase()));
-        } catch (IllegalArgumentException e) {
-            return Optional.empty();
+        if (split.length == 1) {
+            throw new RuntimeException("Can't find format from file: " + 
fileName());
         }
+        return split[split.length - 1];
     }
 
     public DataFileMeta upgrade(int newLevel) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 027aa0662..292fe42c2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -48,17 +48,18 @@ public class DataSplit implements Split {
     private static final long serialVersionUID = 6L;
 
     private long snapshotId = 0;
-    private boolean isStreaming = false;
+    private BinaryRow partition;
+    private int bucket = -1;
+    private String bucketPath;
+
     private List<DataFileMeta> beforeFiles = new ArrayList<>();
     @Nullable private List<DeletionFile> beforeDeletionFiles;
 
-    private BinaryRow partition;
-    private int bucket = -1;
     private List<DataFileMeta> dataFiles;
     @Nullable private List<DeletionFile> dataDeletionFiles;
 
+    private boolean isStreaming = false;
     private boolean rawConvertible;
-    private String bucketPath;
 
     public DataSplit() {}
 
@@ -74,6 +75,10 @@ public class DataSplit implements Split {
         return bucket;
     }
 
+    public String bucketPath() {
+        return bucketPath;
+    }
+
     public List<DataFileMeta> beforeFiles() {
         return beforeFiles;
     }
@@ -99,11 +104,7 @@ public class DataSplit implements Split {
         return rawConvertible;
     }
 
-    public String getBucketPath() {
-        return bucketPath;
-    }
-
-    public OptionalLong getLatestFileCreationEpochMillis() {
+    public OptionalLong latestFileCreationEpochMillis() {
         return 
this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();
     }
 
@@ -128,22 +129,14 @@ public class DataSplit implements Split {
         }
     }
 
-    private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) {
+    private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) {
         return new RawFile(
-                bucketPath + "/" + meta.fileName(),
+                bucketPath + "/" + file.fileName(),
                 0,
-                meta.fileSize(),
-                meta.fileFormat()
-                        .map(t -> t.toString().toLowerCase())
-                        .orElseThrow(
-                                () ->
-                                        new RuntimeException(
-                                                "Can't find format from file: "
-                                                        + bucketPath
-                                                        + "/"
-                                                        + meta.fileName())),
-                meta.schemaId(),
-                meta.rowCount());
+                file.fileSize(),
+                file.fileFormat(),
+                file.schemaId(),
+                file.rowCount());
     }
 
     @Override
@@ -181,30 +174,32 @@ public class DataSplit implements Split {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        DataSplit split = (DataSplit) o;
-        return bucket == split.bucket
-                && Objects.equals(partition, split.partition)
-                && Objects.equals(beforeFiles, split.beforeFiles)
-                && Objects.equals(beforeDeletionFiles, 
split.beforeDeletionFiles)
-                && Objects.equals(dataFiles, split.dataFiles)
-                && Objects.equals(dataDeletionFiles, split.dataDeletionFiles)
-                && isStreaming == split.isStreaming
-                && rawConvertible == split.rawConvertible
-                && Objects.equals(bucketPath, split.bucketPath);
+        DataSplit dataSplit = (DataSplit) o;
+        return snapshotId == dataSplit.snapshotId
+                && bucket == dataSplit.bucket
+                && isStreaming == dataSplit.isStreaming
+                && rawConvertible == dataSplit.rawConvertible
+                && Objects.equals(partition, dataSplit.partition)
+                && Objects.equals(bucketPath, dataSplit.bucketPath)
+                && Objects.equals(beforeFiles, dataSplit.beforeFiles)
+                && Objects.equals(beforeDeletionFiles, 
dataSplit.beforeDeletionFiles)
+                && Objects.equals(dataFiles, dataSplit.dataFiles)
+                && Objects.equals(dataDeletionFiles, 
dataSplit.dataDeletionFiles);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(
+                snapshotId,
                 partition,
                 bucket,
+                bucketPath,
                 beforeFiles,
                 beforeDeletionFiles,
                 dataFiles,
                 dataDeletionFiles,
                 isStreaming,
-                rawConvertible,
-                bucketPath);
+                rawConvertible);
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -219,19 +214,20 @@ public class DataSplit implements Split {
         this.snapshotId = other.snapshotId;
         this.partition = other.partition;
         this.bucket = other.bucket;
+        this.bucketPath = other.bucketPath;
         this.beforeFiles = other.beforeFiles;
         this.beforeDeletionFiles = other.beforeDeletionFiles;
         this.dataFiles = other.dataFiles;
         this.dataDeletionFiles = other.dataDeletionFiles;
         this.isStreaming = other.isStreaming;
         this.rawConvertible = other.rawConvertible;
-        this.bucketPath = other.bucketPath;
     }
 
     public void serialize(DataOutputView out) throws IOException {
         out.writeLong(snapshotId);
         SerializationUtils.serializeBinaryRow(partition, out);
         out.writeInt(bucket);
+        out.writeUTF(bucketPath);
 
         DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
         out.writeInt(beforeFiles.size());
@@ -251,13 +247,13 @@ public class DataSplit implements Split {
         out.writeBoolean(isStreaming);
 
         out.writeBoolean(rawConvertible);
-        out.writeUTF(bucketPath);
     }
 
     public static DataSplit deserialize(DataInputView in) throws IOException {
         long snapshotId = in.readLong();
         BinaryRow partition = SerializationUtils.deserializeBinaryRow(in);
         int bucket = in.readInt();
+        String bucketPath = in.readUTF();
 
         DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
         int beforeNumber = in.readInt();
@@ -278,18 +274,17 @@ public class DataSplit implements Split {
 
         boolean isStreaming = in.readBoolean();
         boolean rawConvertible = in.readBoolean();
-        String bucketPath = in.readUTF();
 
         DataSplit.Builder builder =
                 builder()
                         .withSnapshot(snapshotId)
                         .withPartition(partition)
                         .withBucket(bucket)
+                        .withBucketPath(bucketPath)
                         .withBeforeFiles(beforeFiles)
                         .withDataFiles(dataFiles)
                         .isStreaming(isStreaming)
-                        .rawConvertible(rawConvertible)
-                        .withBucketPath(bucketPath);
+                        .rawConvertible(rawConvertible);
 
         if (beforeDeletionFiles != null) {
             builder.withBeforeDeletionFiles(beforeDeletionFiles);
@@ -324,6 +319,11 @@ public class DataSplit implements Split {
             return this;
         }
 
+        public Builder withBucketPath(String bucketPath) {
+            this.split.bucketPath = bucketPath;
+            return this;
+        }
+
         public Builder withBeforeFiles(List<DataFileMeta> beforeFiles) {
             this.split.beforeFiles = new ArrayList<>(beforeFiles);
             return this;
@@ -354,16 +354,11 @@ public class DataSplit implements Split {
             return this;
         }
 
-        public Builder withBucketPath(String bucketPath) {
-            this.split.bucketPath = bucketPath;
-            return this;
-        }
-
         public DataSplit build() {
             checkArgument(split.partition != null);
             checkArgument(split.bucket != -1);
-            checkArgument(split.dataFiles != null);
             checkArgument(split.bucketPath != null);
+            checkArgument(split.dataFiles != null);
 
             DataSplit split = new DataSplit();
             split.assign(this.split);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 86705ce44..a0e0b5a3b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -68,7 +68,7 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
                                         // take it for false, because multiple 
snapshot read may
                                         // need merge for primary key table
                                         false,
-                                        split.getBucketPath(),
+                                        split.bucketPath(),
                                         split.deletionFiles().orElse(null)),
                                 k -> new ArrayList<>())
                         .addAll(split.dataFiles());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index c70c339d7..159d32748 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -181,7 +181,7 @@ public class FileStoreSourceSplitReader
         if (nextSplit.split() instanceof DataSplit) {
             long eventTime =
                     ((DataSplit) nextSplit.split())
-                            .getLatestFileCreationEpochMillis()
+                            .latestFileCreationEpochMillis()
                             .orElse(FileStoreSourceReaderMetrics.UNDEFINED);
             metrics.recordSnapshotUpdate(eventTime);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index d84646c61..f3b5756a4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -93,7 +93,7 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
         // update metric when reading a new split
         long eventTime =
                 ((DataSplit) split)
-                        .getLatestFileCreationEpochMillis()
+                        .latestFileCreationEpochMillis()
                         .orElse(FileStoreSourceReaderMetrics.UNDEFINED);
         sourceReaderMetrics.recordSnapshotUpdate(eventTime);
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index 556bd82cd..8074322ea 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -117,7 +117,7 @@ trait ScanHelper {
       .withBucket(split.bucket())
       .withDataFiles(dataFiles.toList.asJava)
       .rawConvertible(split.rawConvertible())
-      .withBucketPath(split.getBucketPath)
+      .withBucketPath(split.bucketPath)
     if (deletionVectors) {
       builder.withDataDeletionFiles(deletionFiles.toList.asJava)
     }

Reply via email to