This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 623b9b7238 Flink: switch to FileScanTaskParser for JSON serialization
of IcebergSourceSplit (#7978)
623b9b7238 is described below
commit 623b9b7238fed229853049b132fe06c0a6665c20
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Fri Jul 7 21:44:54 2023 -0700
Flink: switch to FileScanTaskParser for JSON serialization of
IcebergSourceSplit (#7978)
---
.../apache/iceberg/flink/source/IcebergSource.java | 4 +-
.../IcebergEnumeratorStateSerializer.java | 10 ++--
.../flink/source/split/IcebergSourceSplit.java | 54 ++++++++++++++++++++++
.../source/split/IcebergSourceSplitSerializer.java | 17 ++++---
.../TestIcebergEnumeratorStateSerializer.java | 2 +-
.../split/TestIcebergSourceSplitSerializer.java | 50 ++++++++++++++++++--
6 files changed, 117 insertions(+), 20 deletions(-)
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index d3859452a2..f85f627726 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -169,12 +169,12 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
@Override
public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
- return IcebergSourceSplitSerializer.INSTANCE;
+ return new IcebergSourceSplitSerializer(scanContext.caseSensitive());
}
@Override
public SimpleVersionedSerializer<IcebergEnumeratorState>
getEnumeratorCheckpointSerializer() {
- return IcebergEnumeratorStateSerializer.INSTANCE;
+ return new IcebergEnumeratorStateSerializer(scanContext.caseSensitive());
}
private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState>
createEnumerator(
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
index 9998bee99f..95d6db2cfb 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
@@ -35,9 +35,6 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
public class IcebergEnumeratorStateSerializer
implements SimpleVersionedSerializer<IcebergEnumeratorState> {
- public static final IcebergEnumeratorStateSerializer INSTANCE =
- new IcebergEnumeratorStateSerializer();
-
private static final int VERSION = 2;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
@@ -45,8 +42,11 @@ public class IcebergEnumeratorStateSerializer
private final IcebergEnumeratorPositionSerializer positionSerializer =
IcebergEnumeratorPositionSerializer.INSTANCE;
- private final IcebergSourceSplitSerializer splitSerializer =
- IcebergSourceSplitSerializer.INSTANCE;
+ private final IcebergSourceSplitSerializer splitSerializer;
+
+ public IcebergEnumeratorStateSerializer(boolean caseSensitive) {
+ this.splitSerializer = new IcebergSourceSplitSerializer(caseSensitive);
+ }
@Override
public int getVersion() {
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
index 35f8ade984..e4bfbf1452 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -21,19 +21,28 @@ package org.apache.iceberg.flink.source.split;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
+import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.FileScanTaskParser;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@Internal
public class IcebergSourceSplit implements SourceSplit, Serializable {
private static final long serialVersionUID = 1L;
+ private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+ ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));
private final CombinedScanTask task;
@@ -109,6 +118,7 @@ public class IcebergSourceSplit implements SourceSplit,
Serializable {
if (serializedBytesCache == null) {
serializedBytesCache = InstantiationUtil.serializeObject(this);
}
+
return serializedBytesCache;
}
@@ -120,4 +130,48 @@ public class IcebergSourceSplit implements SourceSplit,
Serializable {
throw new RuntimeException("Failed to deserialize the split.", e);
}
}
+
+ byte[] serializeV2() throws IOException {
+ if (serializedBytesCache == null) {
+ DataOutputSerializer out = SERIALIZER_CACHE.get();
+ Collection<FileScanTask> fileScanTasks = task.tasks();
+ Preconditions.checkArgument(
+ fileOffset >= 0 && fileOffset < fileScanTasks.size(),
+ "Invalid file offset: %s. Should be within the range of [0, %s)",
+ fileOffset,
+ fileScanTasks.size());
+
+ out.writeInt(fileOffset);
+ out.writeLong(recordOffset);
+ out.writeInt(fileScanTasks.size());
+
+ for (FileScanTask fileScanTask : fileScanTasks) {
+ String taskJson = FileScanTaskParser.toJson(fileScanTask);
+ out.writeUTF(taskJson);
+ }
+
+ serializedBytesCache = out.getCopyOfBuffer();
+ out.clear();
+ }
+
+ return serializedBytesCache;
+ }
+
+ static IcebergSourceSplit deserializeV2(byte[] serialized, boolean
caseSensitive)
+ throws IOException {
+ DataInputDeserializer in = new DataInputDeserializer(serialized);
+ int fileOffset = in.readInt();
+ long recordOffset = in.readLong();
+ int taskCount = in.readInt();
+
+ List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
+ for (int i = 0; i < taskCount; ++i) {
+ String taskJson = in.readUTF();
+ FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
+ tasks.add(task);
+ }
+
+ CombinedScanTask combinedScanTask = new BaseCombinedScanTask(tasks);
+ return IcebergSourceSplit.fromCombinedScanTask(combinedScanTask,
fileOffset, recordOffset);
+ }
}
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
index ee0f364e17..8c089819e7 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
@@ -22,14 +22,15 @@ import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-/**
- * TODO: use Java serialization for now. Will switch to more stable serializer
from <a
- * href="https://github.com/apache/iceberg/issues/1698">issue-1698</a>.
- */
@Internal
public class IcebergSourceSplitSerializer implements
SimpleVersionedSerializer<IcebergSourceSplit> {
- public static final IcebergSourceSplitSerializer INSTANCE = new
IcebergSourceSplitSerializer();
- private static final int VERSION = 1;
+ private static final int VERSION = 2;
+
+ private final boolean caseSensitive;
+
+ public IcebergSourceSplitSerializer(boolean caseSensitive) {
+ this.caseSensitive = caseSensitive;
+ }
@Override
public int getVersion() {
@@ -38,7 +39,7 @@ public class IcebergSourceSplitSerializer implements
SimpleVersionedSerializer<I
@Override
public byte[] serialize(IcebergSourceSplit split) throws IOException {
- return split.serializeV1();
+ return split.serializeV2();
}
@Override
@@ -46,6 +47,8 @@ public class IcebergSourceSplitSerializer implements
SimpleVersionedSerializer<I
switch (version) {
case 1:
return IcebergSourceSplit.deserializeV1(serialized);
+ case 2:
+ return IcebergSourceSplit.deserializeV2(serialized, caseSensitive);
default:
throw new IOException(
String.format(
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
index 0082e25add..1d12d9f66a 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
@@ -40,7 +40,7 @@ public class TestIcebergEnumeratorStateSerializer {
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
private final IcebergEnumeratorStateSerializer serializer =
- IcebergEnumeratorStateSerializer.INSTANCE;
+ new IcebergEnumeratorStateSerializer(true);
protected final int version;
diff --git
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
index 046b0c31ce..cd778309f9 100644
---
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
+++
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
@@ -21,7 +21,9 @@ package org.apache.iceberg.flink.source.split;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@@ -31,7 +33,7 @@ public class TestIcebergSourceSplitSerializer {
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
- private final IcebergSourceSplitSerializer serializer =
IcebergSourceSplitSerializer.INSTANCE;
+ private final IcebergSourceSplitSerializer serializer = new
IcebergSourceSplitSerializer(true);
@Test
public void testLatestVersion() throws Exception {
@@ -81,6 +83,34 @@ public class TestIcebergSourceSplitSerializer {
}
}
+ @Test
+ public void testV2() throws Exception {
+ serializeAndDeserializeV2(1, 1);
+ serializeAndDeserializeV2(10, 2);
+ }
+
+ private void serializeAndDeserializeV2(int splitCount, int filesPerSplit)
throws Exception {
+ final List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(
+ TEMPORARY_FOLDER, splitCount, filesPerSplit);
+ for (IcebergSourceSplit split : splits) {
+ byte[] result = split.serializeV2();
+ IcebergSourceSplit deserialized =
IcebergSourceSplit.deserializeV2(result, true);
+ assertSplitEquals(split, deserialized);
+ }
+ }
+
+ @Test
+ public void testDeserializeV1() throws Exception {
+ final List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1,
1);
+ for (IcebergSourceSplit split : splits) {
+ byte[] result = split.serializeV1();
+ IcebergSourceSplit deserialized = serializer.deserialize(1, result);
+ assertSplitEquals(split, deserialized);
+ }
+ }
+
@Test
public void testCheckpointedPosition() throws Exception {
final AtomicInteger index = new AtomicInteger();
@@ -90,9 +120,7 @@ public class TestIcebergSourceSplitSerializer {
split -> {
IcebergSourceSplit result;
if (index.get() % 2 == 0) {
- result =
- IcebergSourceSplit.fromCombinedScanTask(
- split.task(), index.get(), index.get());
+ result =
IcebergSourceSplit.fromCombinedScanTask(split.task(), 1, 1);
} else {
result = split;
}
@@ -115,7 +143,19 @@ public class TestIcebergSourceSplitSerializer {
}
private void assertSplitEquals(IcebergSourceSplit expected,
IcebergSourceSplit actual) {
- Assert.assertEquals(expected.splitId(), actual.splitId());
+ List<FileScanTask> expectedTasks =
Lists.newArrayList(expected.task().tasks().iterator());
+ List<FileScanTask> actualTasks =
Lists.newArrayList(actual.task().tasks().iterator());
+ Assert.assertEquals(expectedTasks.size(), actualTasks.size());
+ for (int i = 0; i < expectedTasks.size(); ++i) {
+ FileScanTask expectedTask = expectedTasks.get(i);
+ FileScanTask actualTask = actualTasks.get(i);
+ Assert.assertEquals(expectedTask.file().path(),
actualTask.file().path());
+ Assert.assertEquals(expectedTask.sizeBytes(), actualTask.sizeBytes());
+ Assert.assertEquals(expectedTask.filesCount(), actualTask.filesCount());
+ Assert.assertEquals(expectedTask.start(), actualTask.start());
+ Assert.assertEquals(expectedTask.length(), actualTask.length());
+ }
+
Assert.assertEquals(expected.fileOffset(), actual.fileOffset());
Assert.assertEquals(expected.recordOffset(), actual.recordOffset());
}