This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9b0699581 [core] Refactor DataSplit field order
9b0699581 is described below
commit 9b0699581d6247ae4787be1a536c29c95df0bf5f
Author: Jingsong <[email protected]>
AuthorDate: Fri Apr 26 13:10:14 2024 +0800
[core] Refactor DataSplit field order
---
.../java/org/apache/paimon/io/DataFileMeta.java | 10 +--
.../org/apache/paimon/table/source/DataSplit.java | 87 ++++++++++------------
.../snapshot/IncrementalStartingScanner.java | 2 +-
.../flink/source/FileStoreSourceSplitReader.java | 2 +-
.../paimon/flink/source/operator/ReadOperator.java | 2 +-
.../scala/org/apache/paimon/spark/ScanHelper.scala | 2 +-
6 files changed, 49 insertions(+), 56 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index c5e7c627f..dc7375d6c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -287,14 +287,12 @@ public class DataFileMeta {
.toEpochMilli();
}
- public Optional<CoreOptions.FileFormatType> fileFormat() {
+ public String fileFormat() {
String[] split = fileName.split("\\.");
- try {
- return Optional.of(
- CoreOptions.FileFormatType.valueOf(split[split.length -
1].toUpperCase()));
- } catch (IllegalArgumentException e) {
- return Optional.empty();
+ if (split.length == 1) {
+ throw new RuntimeException("Can't find format from file: " +
fileName());
}
+ return split[split.length - 1];
}
public DataFileMeta upgrade(int newLevel) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 027aa0662..292fe42c2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -48,17 +48,18 @@ public class DataSplit implements Split {
private static final long serialVersionUID = 6L;
private long snapshotId = 0;
- private boolean isStreaming = false;
+ private BinaryRow partition;
+ private int bucket = -1;
+ private String bucketPath;
+
private List<DataFileMeta> beforeFiles = new ArrayList<>();
@Nullable private List<DeletionFile> beforeDeletionFiles;
- private BinaryRow partition;
- private int bucket = -1;
private List<DataFileMeta> dataFiles;
@Nullable private List<DeletionFile> dataDeletionFiles;
+ private boolean isStreaming = false;
private boolean rawConvertible;
- private String bucketPath;
public DataSplit() {}
@@ -74,6 +75,10 @@ public class DataSplit implements Split {
return bucket;
}
+ public String bucketPath() {
+ return bucketPath;
+ }
+
public List<DataFileMeta> beforeFiles() {
return beforeFiles;
}
@@ -99,11 +104,7 @@ public class DataSplit implements Split {
return rawConvertible;
}
- public String getBucketPath() {
- return bucketPath;
- }
-
- public OptionalLong getLatestFileCreationEpochMillis() {
+ public OptionalLong latestFileCreationEpochMillis() {
return
this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();
}
@@ -128,22 +129,14 @@ public class DataSplit implements Split {
}
}
- private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) {
+ private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) {
return new RawFile(
- bucketPath + "/" + meta.fileName(),
+ bucketPath + "/" + file.fileName(),
0,
- meta.fileSize(),
- meta.fileFormat()
- .map(t -> t.toString().toLowerCase())
- .orElseThrow(
- () ->
- new RuntimeException(
- "Can't find format from file: "
- + bucketPath
- + "/"
- + meta.fileName())),
- meta.schemaId(),
- meta.rowCount());
+ file.fileSize(),
+ file.fileFormat(),
+ file.schemaId(),
+ file.rowCount());
}
@Override
@@ -181,30 +174,32 @@ public class DataSplit implements Split {
if (o == null || getClass() != o.getClass()) {
return false;
}
- DataSplit split = (DataSplit) o;
- return bucket == split.bucket
- && Objects.equals(partition, split.partition)
- && Objects.equals(beforeFiles, split.beforeFiles)
- && Objects.equals(beforeDeletionFiles,
split.beforeDeletionFiles)
- && Objects.equals(dataFiles, split.dataFiles)
- && Objects.equals(dataDeletionFiles, split.dataDeletionFiles)
- && isStreaming == split.isStreaming
- && rawConvertible == split.rawConvertible
- && Objects.equals(bucketPath, split.bucketPath);
+ DataSplit dataSplit = (DataSplit) o;
+ return snapshotId == dataSplit.snapshotId
+ && bucket == dataSplit.bucket
+ && isStreaming == dataSplit.isStreaming
+ && rawConvertible == dataSplit.rawConvertible
+ && Objects.equals(partition, dataSplit.partition)
+ && Objects.equals(bucketPath, dataSplit.bucketPath)
+ && Objects.equals(beforeFiles, dataSplit.beforeFiles)
+ && Objects.equals(beforeDeletionFiles,
dataSplit.beforeDeletionFiles)
+ && Objects.equals(dataFiles, dataSplit.dataFiles)
+ && Objects.equals(dataDeletionFiles,
dataSplit.dataDeletionFiles);
}
@Override
public int hashCode() {
return Objects.hash(
+ snapshotId,
partition,
bucket,
+ bucketPath,
beforeFiles,
beforeDeletionFiles,
dataFiles,
dataDeletionFiles,
isStreaming,
- rawConvertible,
- bucketPath);
+ rawConvertible);
}
private void writeObject(ObjectOutputStream out) throws IOException {
@@ -219,19 +214,20 @@ public class DataSplit implements Split {
this.snapshotId = other.snapshotId;
this.partition = other.partition;
this.bucket = other.bucket;
+ this.bucketPath = other.bucketPath;
this.beforeFiles = other.beforeFiles;
this.beforeDeletionFiles = other.beforeDeletionFiles;
this.dataFiles = other.dataFiles;
this.dataDeletionFiles = other.dataDeletionFiles;
this.isStreaming = other.isStreaming;
this.rawConvertible = other.rawConvertible;
- this.bucketPath = other.bucketPath;
}
public void serialize(DataOutputView out) throws IOException {
out.writeLong(snapshotId);
SerializationUtils.serializeBinaryRow(partition, out);
out.writeInt(bucket);
+ out.writeUTF(bucketPath);
DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
out.writeInt(beforeFiles.size());
@@ -251,13 +247,13 @@ public class DataSplit implements Split {
out.writeBoolean(isStreaming);
out.writeBoolean(rawConvertible);
- out.writeUTF(bucketPath);
}
public static DataSplit deserialize(DataInputView in) throws IOException {
long snapshotId = in.readLong();
BinaryRow partition = SerializationUtils.deserializeBinaryRow(in);
int bucket = in.readInt();
+ String bucketPath = in.readUTF();
DataFileMetaSerializer dataFileSer = new DataFileMetaSerializer();
int beforeNumber = in.readInt();
@@ -278,18 +274,17 @@ public class DataSplit implements Split {
boolean isStreaming = in.readBoolean();
boolean rawConvertible = in.readBoolean();
- String bucketPath = in.readUTF();
DataSplit.Builder builder =
builder()
.withSnapshot(snapshotId)
.withPartition(partition)
.withBucket(bucket)
+ .withBucketPath(bucketPath)
.withBeforeFiles(beforeFiles)
.withDataFiles(dataFiles)
.isStreaming(isStreaming)
- .rawConvertible(rawConvertible)
- .withBucketPath(bucketPath);
+ .rawConvertible(rawConvertible);
if (beforeDeletionFiles != null) {
builder.withBeforeDeletionFiles(beforeDeletionFiles);
@@ -324,6 +319,11 @@ public class DataSplit implements Split {
return this;
}
+ public Builder withBucketPath(String bucketPath) {
+ this.split.bucketPath = bucketPath;
+ return this;
+ }
+
public Builder withBeforeFiles(List<DataFileMeta> beforeFiles) {
this.split.beforeFiles = new ArrayList<>(beforeFiles);
return this;
@@ -354,16 +354,11 @@ public class DataSplit implements Split {
return this;
}
- public Builder withBucketPath(String bucketPath) {
- this.split.bucketPath = bucketPath;
- return this;
- }
-
public DataSplit build() {
checkArgument(split.partition != null);
checkArgument(split.bucket != -1);
- checkArgument(split.dataFiles != null);
checkArgument(split.bucketPath != null);
+ checkArgument(split.dataFiles != null);
DataSplit split = new DataSplit();
split.assign(this.split);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 86705ce44..a0e0b5a3b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -68,7 +68,7 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
// take it for false, because multiple
snapshot read may
// need merge for primary key table
false,
- split.getBucketPath(),
+ split.bucketPath(),
split.deletionFiles().orElse(null)),
k -> new ArrayList<>())
.addAll(split.dataFiles());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index c70c339d7..159d32748 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -181,7 +181,7 @@ public class FileStoreSourceSplitReader
if (nextSplit.split() instanceof DataSplit) {
long eventTime =
((DataSplit) nextSplit.split())
- .getLatestFileCreationEpochMillis()
+ .latestFileCreationEpochMillis()
.orElse(FileStoreSourceReaderMetrics.UNDEFINED);
metrics.recordSnapshotUpdate(eventTime);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index d84646c61..f3b5756a4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -93,7 +93,7 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
// update metric when reading a new split
long eventTime =
((DataSplit) split)
- .getLatestFileCreationEpochMillis()
+ .latestFileCreationEpochMillis()
.orElse(FileStoreSourceReaderMetrics.UNDEFINED);
sourceReaderMetrics.recordSnapshotUpdate(eventTime);
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index 556bd82cd..8074322ea 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -117,7 +117,7 @@ trait ScanHelper {
.withBucket(split.bucket())
.withDataFiles(dataFiles.toList.asJava)
.rawConvertible(split.rawConvertible())
- .withBucketPath(split.getBucketPath)
+ .withBucketPath(split.bucketPath)
if (deletionVectors) {
builder.withDataDeletionFiles(deletionFiles.toList.asJava)
}