This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 623b9b7238 Flink: switch to FileScanTaskParser for JSON serialization 
of IcebergSourceSplit (#7978)
623b9b7238 is described below

commit 623b9b7238fed229853049b132fe06c0a6665c20
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Fri Jul 7 21:44:54 2023 -0700

    Flink: switch to FileScanTaskParser for JSON serialization of 
IcebergSourceSplit (#7978)
---
 .../apache/iceberg/flink/source/IcebergSource.java |  4 +-
 .../IcebergEnumeratorStateSerializer.java          | 10 ++--
 .../flink/source/split/IcebergSourceSplit.java     | 54 ++++++++++++++++++++++
 .../source/split/IcebergSourceSplitSerializer.java | 17 ++++---
 .../TestIcebergEnumeratorStateSerializer.java      |  2 +-
 .../split/TestIcebergSourceSplitSerializer.java    | 50 ++++++++++++++++++--
 6 files changed, 117 insertions(+), 20 deletions(-)

diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index d3859452a2..f85f627726 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -169,12 +169,12 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
 
   @Override
   public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
-    return IcebergSourceSplitSerializer.INSTANCE;
+    return new IcebergSourceSplitSerializer(scanContext.caseSensitive());
   }
 
   @Override
   public SimpleVersionedSerializer<IcebergEnumeratorState> 
getEnumeratorCheckpointSerializer() {
-    return IcebergEnumeratorStateSerializer.INSTANCE;
+    return new IcebergEnumeratorStateSerializer(scanContext.caseSensitive());
   }
 
   private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> 
createEnumerator(
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
index 9998bee99f..95d6db2cfb 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/IcebergEnumeratorStateSerializer.java
@@ -35,9 +35,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 public class IcebergEnumeratorStateSerializer
     implements SimpleVersionedSerializer<IcebergEnumeratorState> {
 
-  public static final IcebergEnumeratorStateSerializer INSTANCE =
-      new IcebergEnumeratorStateSerializer();
-
   private static final int VERSION = 2;
 
   private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
@@ -45,8 +42,11 @@ public class IcebergEnumeratorStateSerializer
 
   private final IcebergEnumeratorPositionSerializer positionSerializer =
       IcebergEnumeratorPositionSerializer.INSTANCE;
-  private final IcebergSourceSplitSerializer splitSerializer =
-      IcebergSourceSplitSerializer.INSTANCE;
+  private final IcebergSourceSplitSerializer splitSerializer;
+
+  public IcebergEnumeratorStateSerializer(boolean caseSensitive) {
+    this.splitSerializer = new IcebergSourceSplitSerializer(caseSensitive);
+  }
 
   @Override
   public int getVersion() {
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
index 35f8ade984..e4bfbf1452 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -21,19 +21,28 @@ package org.apache.iceberg.flink.source.split;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.List;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.iceberg.BaseCombinedScanTask;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.FileScanTaskParser;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 @Internal
 public class IcebergSourceSplit implements SourceSplit, Serializable {
   private static final long serialVersionUID = 1L;
+  private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+      ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));
 
   private final CombinedScanTask task;
 
@@ -109,6 +118,7 @@ public class IcebergSourceSplit implements SourceSplit, 
Serializable {
     if (serializedBytesCache == null) {
       serializedBytesCache = InstantiationUtil.serializeObject(this);
     }
+
     return serializedBytesCache;
   }
 
@@ -120,4 +130,48 @@ public class IcebergSourceSplit implements SourceSplit, 
Serializable {
       throw new RuntimeException("Failed to deserialize the split.", e);
     }
   }
+
+  byte[] serializeV2() throws IOException {
+    if (serializedBytesCache == null) {
+      DataOutputSerializer out = SERIALIZER_CACHE.get();
+      Collection<FileScanTask> fileScanTasks = task.tasks();
+      Preconditions.checkArgument(
+          fileOffset >= 0 && fileOffset < fileScanTasks.size(),
+          "Invalid file offset: %s. Should be within the range of [0, %s)",
+          fileOffset,
+          fileScanTasks.size());
+
+      out.writeInt(fileOffset);
+      out.writeLong(recordOffset);
+      out.writeInt(fileScanTasks.size());
+
+      for (FileScanTask fileScanTask : fileScanTasks) {
+        String taskJson = FileScanTaskParser.toJson(fileScanTask);
+        out.writeUTF(taskJson);
+      }
+
+      serializedBytesCache = out.getCopyOfBuffer();
+      out.clear();
+    }
+
+    return serializedBytesCache;
+  }
+
+  static IcebergSourceSplit deserializeV2(byte[] serialized, boolean 
caseSensitive)
+      throws IOException {
+    DataInputDeserializer in = new DataInputDeserializer(serialized);
+    int fileOffset = in.readInt();
+    long recordOffset = in.readLong();
+    int taskCount = in.readInt();
+
+    List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
+    for (int i = 0; i < taskCount; ++i) {
+      String taskJson = in.readUTF();
+      FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
+      tasks.add(task);
+    }
+
+    CombinedScanTask combinedScanTask = new BaseCombinedScanTask(tasks);
+    return IcebergSourceSplit.fromCombinedScanTask(combinedScanTask, 
fileOffset, recordOffset);
+  }
 }
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
index ee0f364e17..8c089819e7 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
@@ -22,14 +22,15 @@ import java.io.IOException;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
-/**
- * TODO: use Java serialization for now. Will switch to more stable serializer 
from <a
- * href="https://github.com/apache/iceberg/issues/1698";>issue-1698</a>.
- */
 @Internal
 public class IcebergSourceSplitSerializer implements 
SimpleVersionedSerializer<IcebergSourceSplit> {
-  public static final IcebergSourceSplitSerializer INSTANCE = new 
IcebergSourceSplitSerializer();
-  private static final int VERSION = 1;
+  private static final int VERSION = 2;
+
+  private final boolean caseSensitive;
+
+  public IcebergSourceSplitSerializer(boolean caseSensitive) {
+    this.caseSensitive = caseSensitive;
+  }
 
   @Override
   public int getVersion() {
@@ -38,7 +39,7 @@ public class IcebergSourceSplitSerializer implements 
SimpleVersionedSerializer<I
 
   @Override
   public byte[] serialize(IcebergSourceSplit split) throws IOException {
-    return split.serializeV1();
+    return split.serializeV2();
   }
 
   @Override
@@ -46,6 +47,8 @@ public class IcebergSourceSplitSerializer implements 
SimpleVersionedSerializer<I
     switch (version) {
       case 1:
         return IcebergSourceSplit.deserializeV1(serialized);
+      case 2:
+        return IcebergSourceSplit.deserializeV2(serialized, caseSensitive);
       default:
         throw new IOException(
             String.format(
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
index 0082e25add..1d12d9f66a 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestIcebergEnumeratorStateSerializer.java
@@ -40,7 +40,7 @@ public class TestIcebergEnumeratorStateSerializer {
   @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
 
   private final IcebergEnumeratorStateSerializer serializer =
-      IcebergEnumeratorStateSerializer.INSTANCE;
+      new IcebergEnumeratorStateSerializer(true);
 
   protected final int version;
 
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
index 046b0c31ce..cd778309f9 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
@@ -21,7 +21,9 @@ package org.apache.iceberg.flink.source.split;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.flink.source.SplitHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -31,7 +33,7 @@ public class TestIcebergSourceSplitSerializer {
 
   @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
 
-  private final IcebergSourceSplitSerializer serializer = 
IcebergSourceSplitSerializer.INSTANCE;
+  private final IcebergSourceSplitSerializer serializer = new 
IcebergSourceSplitSerializer(true);
 
   @Test
   public void testLatestVersion() throws Exception {
@@ -81,6 +83,34 @@ public class TestIcebergSourceSplitSerializer {
     }
   }
 
+  @Test
+  public void testV2() throws Exception {
+    serializeAndDeserializeV2(1, 1);
+    serializeAndDeserializeV2(10, 2);
+  }
+
+  private void serializeAndDeserializeV2(int splitCount, int filesPerSplit) 
throws Exception {
+    final List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(
+            TEMPORARY_FOLDER, splitCount, filesPerSplit);
+    for (IcebergSourceSplit split : splits) {
+      byte[] result = split.serializeV2();
+      IcebergSourceSplit deserialized = 
IcebergSourceSplit.deserializeV2(result, true);
+      assertSplitEquals(split, deserialized);
+    }
+  }
+
+  @Test
+  public void testDeserializeV1() throws Exception {
+    final List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 
1);
+    for (IcebergSourceSplit split : splits) {
+      byte[] result = split.serializeV1();
+      IcebergSourceSplit deserialized = serializer.deserialize(1, result);
+      assertSplitEquals(split, deserialized);
+    }
+  }
+
   @Test
   public void testCheckpointedPosition() throws Exception {
     final AtomicInteger index = new AtomicInteger();
@@ -90,9 +120,7 @@ public class TestIcebergSourceSplitSerializer {
                 split -> {
                   IcebergSourceSplit result;
                   if (index.get() % 2 == 0) {
-                    result =
-                        IcebergSourceSplit.fromCombinedScanTask(
-                            split.task(), index.get(), index.get());
+                    result = 
IcebergSourceSplit.fromCombinedScanTask(split.task(), 1, 1);
                   } else {
                     result = split;
                   }
@@ -115,7 +143,19 @@ public class TestIcebergSourceSplitSerializer {
   }
 
   private void assertSplitEquals(IcebergSourceSplit expected, 
IcebergSourceSplit actual) {
-    Assert.assertEquals(expected.splitId(), actual.splitId());
+    List<FileScanTask> expectedTasks = 
Lists.newArrayList(expected.task().tasks().iterator());
+    List<FileScanTask> actualTasks = 
Lists.newArrayList(actual.task().tasks().iterator());
+    Assert.assertEquals(expectedTasks.size(), actualTasks.size());
+    for (int i = 0; i < expectedTasks.size(); ++i) {
+      FileScanTask expectedTask = expectedTasks.get(i);
+      FileScanTask actualTask = actualTasks.get(i);
+      Assert.assertEquals(expectedTask.file().path(), 
actualTask.file().path());
+      Assert.assertEquals(expectedTask.sizeBytes(), actualTask.sizeBytes());
+      Assert.assertEquals(expectedTask.filesCount(), actualTask.filesCount());
+      Assert.assertEquals(expectedTask.start(), actualTask.start());
+      Assert.assertEquals(expectedTask.length(), actualTask.length());
+    }
+
     Assert.assertEquals(expected.fileOffset(), actual.fileOffset());
     Assert.assertEquals(expected.recordOffset(), actual.recordOffset());
   }

Reply via email to