This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ed33839e7 Core, Flink: Add task-type field to JSON serde of scan task 
/ Add JSON serde for StaticDataTask. (#9728)
9ed33839e7 is described below

commit 9ed33839e7a8820b5069cbb51cd54a80785d6a62
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Wed Jun 26 16:37:37 2024 +0800

    Core, Flink: Add task-type field to JSON serde of scan task / Add JSON 
serde for StaticDataTask. (#9728)
---
 .../java/org/apache/iceberg/DataTaskParser.java    |  81 ++++++
 .../org/apache/iceberg/FileScanTaskParser.java     |  39 ++-
 .../java/org/apache/iceberg/ScanTaskParser.java    | 105 ++++++++
 .../java/org/apache/iceberg/StaticDataTask.java    |  26 ++
 .../org/apache/iceberg/TestDataTaskParser.java     | 274 +++++++++++++++++++++
 .../org/apache/iceberg/TestFileScanTaskParser.java |  75 +++++-
 .../org/apache/iceberg/TestScanTaskParser.java     |  54 ++++
 .../flink/source/split/IcebergSourceSplit.java     |   6 +-
 .../flink/source/split/IcebergSourceSplit.java     |   6 +-
 .../flink/source/split/IcebergSourceSplit.java     |   6 +-
 10 files changed, 644 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/DataTaskParser.java 
b/core/src/main/java/org/apache/iceberg/DataTaskParser.java
new file mode 100644
index 0000000000..428bcf15e7
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/DataTaskParser.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class DataTaskParser {
+  private static final String SCHEMA = "schema";
+  private static final String PROJECTED_SCHEMA = "projection";
+  private static final String METADATA_FILE = "metadata-file";
+  private static final String ROWS = "rows";
+
+  private DataTaskParser() {}
+
+  static void toJson(StaticDataTask dataTask, JsonGenerator generator) throws 
IOException {
+    Preconditions.checkArgument(dataTask != null, "Invalid data task: null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
+
+    generator.writeFieldName(SCHEMA);
+    SchemaParser.toJson(dataTask.schema(), generator);
+
+    generator.writeFieldName(PROJECTED_SCHEMA);
+    SchemaParser.toJson(dataTask.projectedSchema(), generator);
+
+    generator.writeFieldName(METADATA_FILE);
+    ContentFileParser.toJson(dataTask.metadataFile(), 
PartitionSpec.unpartitioned(), generator);
+
+    Preconditions.checkArgument(dataTask.tableRows() != null, "Invalid data 
task: null table rows");
+    generator.writeArrayFieldStart(ROWS);
+    for (StructLike row : dataTask.tableRows()) {
+      SingleValueParser.toJson(dataTask.schema().asStruct(), row, generator);
+    }
+
+    generator.writeEndArray();
+  }
+
+  static StaticDataTask fromJson(JsonNode jsonNode) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for data 
task: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for data task: non-object 
(%s)", jsonNode);
+
+    Schema schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode));
+    Schema projectedSchema = 
SchemaParser.fromJson(JsonUtil.get(PROJECTED_SCHEMA, jsonNode));
+    DataFile metadataFile =
+        (DataFile)
+            ContentFileParser.fromJson(
+                JsonUtil.get(METADATA_FILE, jsonNode), 
PartitionSpec.unpartitioned());
+
+    JsonNode rowsArray = JsonUtil.get(ROWS, jsonNode);
+    Preconditions.checkArgument(
+        rowsArray.isArray(), "Invalid JSON node for rows: non-array (%s)", 
rowsArray);
+
+    StructLike[] rows = new StructLike[rowsArray.size()];
+    for (int i = 0; i < rowsArray.size(); ++i) {
+      JsonNode rowNode = rowsArray.get(i);
+      rows[i] = (StructLike) SingleValueParser.fromJson(schema.asStruct(), 
rowNode);
+    }
+
+    return new StaticDataTask(metadataFile, schema, projectedSchema, rows);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java 
b/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
index 0a708f2668..a6ea41319f 100644
--- a/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
+++ b/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
@@ -40,16 +40,38 @@ public class FileScanTaskParser {
 
   private FileScanTaskParser() {}
 
+  /**
+   * Serialize file scan task to JSON string
+   *
+   * @deprecated will be removed in 1.7.0; use {@link 
ScanTaskParser#toJson(FileScanTask)} instead
+   */
+  @Deprecated
   public static String toJson(FileScanTask fileScanTask) {
+    Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
     return JsonUtil.generate(
-        generator -> FileScanTaskParser.toJson(fileScanTask, generator), 
false);
+        generator -> {
+          generator.writeStartObject();
+          toJson(fileScanTask, generator);
+          generator.writeEndObject();
+        },
+        false);
+  }
+
+  /**
+   * Deserialize file scan task from JSON string
+   *
+   * @deprecated will be removed in 1.7.0; use {@link 
ScanTaskParser#fromJson(String, boolean)}
+   *     instead
+   */
+  @Deprecated
+  public static FileScanTask fromJson(String json, boolean caseSensitive) {
+    Preconditions.checkArgument(json != null, "Invalid JSON string for file 
scan task: null");
+    return JsonUtil.parse(json, node -> fromJson(node, caseSensitive));
   }
 
-  private static void toJson(FileScanTask fileScanTask, JsonGenerator 
generator)
-      throws IOException {
+  static void toJson(FileScanTask fileScanTask, JsonGenerator generator) 
throws IOException {
     Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
     Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
-    generator.writeStartObject();
 
     generator.writeFieldName(SCHEMA);
     SchemaParser.toJson(fileScanTask.schema(), generator);
@@ -78,16 +100,9 @@ public class FileScanTaskParser {
       generator.writeFieldName(RESIDUAL);
       ExpressionParser.toJson(fileScanTask.residual(), generator);
     }
-
-    generator.writeEndObject();
-  }
-
-  public static FileScanTask fromJson(String json, boolean caseSensitive) {
-    Preconditions.checkArgument(json != null, "Invalid JSON string for file 
scan task: null");
-    return JsonUtil.parse(json, node -> FileScanTaskParser.fromJson(node, 
caseSensitive));
   }
 
-  private static FileScanTask fromJson(JsonNode jsonNode, boolean 
caseSensitive) {
+  static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) {
     Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file 
scan task: null");
     Preconditions.checkArgument(
         jsonNode.isObject(), "Invalid JSON node for file scan task: non-object 
(%s)", jsonNode);
diff --git a/core/src/main/java/org/apache/iceberg/ScanTaskParser.java 
b/core/src/main/java/org/apache/iceberg/ScanTaskParser.java
new file mode 100644
index 0000000000..9447d0668a
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ScanTaskParser.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.util.JsonUtil;
+
+public class ScanTaskParser {
+  private static final String TASK_TYPE = "task-type";
+
+  private enum TaskType {
+    FILE_SCAN_TASK("file-scan-task"),
+    DATA_TASK("data-task");
+
+    private final String value;
+
+    TaskType(String value) {
+      this.value = value;
+    }
+
+    public static TaskType fromTypeName(String value) {
+      Preconditions.checkArgument(
+          !Strings.isNullOrEmpty(value), "Invalid task type name: null or 
empty");
+      if (FILE_SCAN_TASK.typeName().equalsIgnoreCase(value)) {
+        return FILE_SCAN_TASK;
+      } else if (DATA_TASK.typeName().equalsIgnoreCase(value)) {
+        return DATA_TASK;
+      } else {
+        throw new IllegalArgumentException("Unknown task type: " + value);
+      }
+    }
+
+    public String typeName() {
+      return value;
+    }
+  }
+
+  private ScanTaskParser() {}
+
+  public static String toJson(FileScanTask fileScanTask) {
+    Preconditions.checkArgument(fileScanTask != null, "Invalid scan task: 
null");
+    return JsonUtil.generate(generator -> toJson(fileScanTask, generator), 
false);
+  }
+
+  public static FileScanTask fromJson(String json, boolean caseSensitive) {
+    Preconditions.checkArgument(json != null, "Invalid JSON string for scan 
task: null");
+    return JsonUtil.parse(json, node -> fromJson(node, caseSensitive));
+  }
+
+  private static void toJson(FileScanTask fileScanTask, JsonGenerator 
generator)
+      throws IOException {
+    generator.writeStartObject();
+
+    if (fileScanTask instanceof StaticDataTask) {
+      generator.writeStringField(TASK_TYPE, TaskType.DATA_TASK.typeName());
+      DataTaskParser.toJson((StaticDataTask) fileScanTask, generator);
+    } else if (fileScanTask instanceof BaseFileScanTask
+        || fileScanTask instanceof BaseFileScanTask.SplitScanTask) {
+      generator.writeStringField(TASK_TYPE, 
TaskType.FILE_SCAN_TASK.typeName());
+      FileScanTaskParser.toJson(fileScanTask, generator);
+    } else {
+      throw new UnsupportedOperationException(
+          "Unsupported task type: " + 
fileScanTask.getClass().getCanonicalName());
+    }
+
+    generator.writeEndObject();
+  }
+
+  private static FileScanTask fromJson(JsonNode jsonNode, boolean 
caseSensitive) {
+    TaskType taskType = TaskType.FILE_SCAN_TASK;
+    String taskTypeStr = JsonUtil.getStringOrNull(TASK_TYPE, jsonNode);
+    if (null != taskTypeStr) {
+      taskType = TaskType.fromTypeName(taskTypeStr);
+    }
+
+    switch (taskType) {
+      case FILE_SCAN_TASK:
+        return FileScanTaskParser.fromJson(jsonNode, caseSensitive);
+      case DATA_TASK:
+        return DataTaskParser.fromJson(jsonNode);
+      default:
+        throw new UnsupportedOperationException("Unsupported task type: " + 
taskType.typeName());
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/StaticDataTask.java 
b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
index cffb424279..f25ebd49c9 100644
--- a/core/src/main/java/org/apache/iceberg/StaticDataTask.java
+++ b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
@@ -64,6 +64,19 @@ class StaticDataTask implements DataTask {
     this.rows = rows;
   }
 
+  StaticDataTask(
+      DataFile metadataFile, Schema tableSchema, Schema projectedSchema, 
StructLike[] rows) {
+    this.tableSchema = tableSchema;
+    this.projectedSchema = projectedSchema;
+    this.metadataFile = metadataFile;
+    this.rows = rows;
+  }
+
+  @Override
+  public Schema schema() {
+    return tableSchema;
+  }
+
   @Override
   public List<DeleteFile> deletes() {
     return ImmutableList.of();
@@ -106,6 +119,19 @@ class StaticDataTask implements DataTask {
     return ImmutableList.of(this);
   }
 
+  Schema projectedSchema() {
+    return projectedSchema;
+  }
+
+  DataFile metadataFile() {
+    return metadataFile;
+  }
+
+  /** @return the table rows before projection */
+  StructLike[] tableRows() {
+    return rows;
+  }
+
   /** Implements {@link StructLike#get} for passing static rows. */
   static class Row implements StructLike, Serializable {
     public static Row of(Object... values) {
diff --git a/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java 
b/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java
new file mode 100644
index 0000000000..5a3d119046
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.JsonUtil;
+import org.junit.jupiter.api.Test;
+
+public class TestDataTaskParser {
+  // copied from SnapshotsTable to avoid making it package public
+  private static final Schema SNAPSHOT_SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "committed_at", 
Types.TimestampType.withZone()),
+          Types.NestedField.required(2, "snapshot_id", Types.LongType.get()),
+          Types.NestedField.optional(3, "parent_id", Types.LongType.get()),
+          Types.NestedField.optional(4, "operation", Types.StringType.get()),
+          Types.NestedField.optional(5, "manifest_list", 
Types.StringType.get()),
+          Types.NestedField.optional(
+              6,
+              "summary",
+              Types.MapType.ofRequired(7, 8, Types.StringType.get(), 
Types.StringType.get())));
+
+  // copied from SnapshotsTable to avoid making it package public
+  private static StaticDataTask.Row snapshotToRow(Snapshot snap) {
+    return StaticDataTask.Row.of(
+        snap.timestampMillis() * 1000,
+        snap.snapshotId(),
+        snap.parentId(),
+        snap.operation(),
+        snap.manifestListLocation(),
+        snap.summary());
+  }
+
+  @Test
+  public void nullCheck() throws Exception {
+    StringWriter writer = new StringWriter();
+    JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+
+    assertThatThrownBy(() -> DataTaskParser.toJson(null, generator))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid data task: null");
+
+    assertThatThrownBy(() -> DataTaskParser.toJson((StaticDataTask) 
createDataTask(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON generator: null");
+
+    assertThatThrownBy(() -> DataTaskParser.fromJson(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON node for data task: null");
+  }
+
+  @Test
+  public void invalidJsonNode() throws Exception {
+    String jsonStr = "{\"str\":\"1\", \"arr\":[]}";
+    ObjectMapper mapper = new ObjectMapper();
+    JsonNode rootNode = mapper.reader().readTree(jsonStr);
+
+    assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("str")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid JSON node for data task: non-object ");
+
+    assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("arr")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid JSON node for data task: non-object ");
+  }
+
+  @Test
+  public void missingFields() throws Exception {
+    ObjectMapper mapper = new ObjectMapper();
+
+    String missingSchemaStr = "{}";
+    JsonNode missingSchemaNode = mapper.reader().readTree(missingSchemaStr);
+    assertThatThrownBy(() -> DataTaskParser.fromJson(missingSchemaNode))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot parse missing field: schema");
+
+    String missingProjectionStr =
+        "{"
+            + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+            + 
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}"
+            + "}";
+    JsonNode missingProjectionNode = 
mapper.reader().readTree(missingProjectionStr);
+    assertThatThrownBy(() -> DataTaskParser.fromJson(missingProjectionNode))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot parse missing field: projection");
+
+    String missingMetadataFileStr =
+        "{"
+            + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+            + 
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]},"
+            + "\"projection\":{\"type\":\"struct\",\"schema-id\":0,"
+            + 
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}"
+            + "}";
+    JsonNode missingMetadataFileNode = 
mapper.reader().readTree(missingMetadataFileStr);
+    assertThatThrownBy(() -> DataTaskParser.fromJson(missingMetadataFileNode))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot parse missing field: metadata-file");
+
+    String missingTableRowsStr =
+        "{\"task-type\":\"data-task\","
+            + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+            + 
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+            + 
"{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+            + 
"{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+            + 
"{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+            + 
"{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+            + 
"{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+            + "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+            + "\"value\":\"string\",\"value-required\":true}}]},"
+            + "\"projection\":{\"type\":\"struct\",\"schema-id\":0,"
+            + 
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+            + 
"{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+            + 
"{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+            + 
"{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+            + 
"{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+            + 
"{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+            + "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+            + "\"value\":\"string\",\"value-required\":true}}]},"
+            + "\"metadata-file\":{\"spec-id\":0,\"content\":\"DATA\","
+            + "\"file-path\":\"/tmp/metadata2.json\","
+            + "\"file-format\":\"METADATA\",\"partition\":{},"
+            + 
"\"file-size-in-bytes\":0,\"record-count\":2,\"sort-order-id\":0}"
+            + "}";
+    JsonNode missingTableRowsNode = 
mapper.reader().readTree(missingTableRowsStr);
+    assertThatThrownBy(() -> DataTaskParser.fromJson(missingTableRowsNode))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot parse missing field: rows");
+  }
+
+  @Test
+  public void roundTripSerde() {
+    StaticDataTask dataTask = (StaticDataTask) createDataTask();
+    String jsonStr = ScanTaskParser.toJson(dataTask);
+    assertThat(jsonStr).isEqualTo(snapshotsDataTaskJson());
+    StaticDataTask deserializedTask = (StaticDataTask) 
ScanTaskParser.fromJson(jsonStr, true);
+    assertDataTaskEquals(dataTask, deserializedTask);
+  }
+
+  private DataTask createDataTask() {
+    Map<String, String> summary1 =
+        ImmutableMap.of(
+            "added-data-files", "1",
+            "added-records", "1",
+            "added-files-size", "10",
+            "changed-partition-count", "1",
+            "total-records", "1",
+            "total-files-size", "10",
+            "total-data-files", "1",
+            "total-delete-files", "0",
+            "total-position-deletes", "0",
+            "total-equality-deletes", "0");
+
+    Map<String, String> summary2 =
+        ImmutableMap.of(
+            "added-data-files", "1",
+            "added-records", "1",
+            "added-files-size", "10",
+            "changed-partition-count", "1",
+            "total-records", "2",
+            "total-files-size", "20",
+            "total-data-files", "2",
+            "total-delete-files", "0",
+            "total-position-deletes", "0",
+            "total-equality-deletes", "0");
+
+    List<Snapshot> snapshots =
+        Arrays.asList(
+            new BaseSnapshot(
+                1L, 1L, null, 1234567890000L, "append", summary1, 1, 
"file:/tmp/manifest1.avro"),
+            new BaseSnapshot(
+                2L, 2L, 1L, 9876543210000L, "append", summary2, 1, 
"file:/tmp/manifest2.avro"));
+
+    return StaticDataTask.of(
+        Files.localInput("file:/tmp/metadata2.json"),
+        SNAPSHOT_SCHEMA,
+        SNAPSHOT_SCHEMA,
+        snapshots,
+        TestDataTaskParser::snapshotToRow);
+  }
+
+  private String snapshotsDataTaskJson() {
+    return "{\"task-type\":\"data-task\","
+        + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+        + 
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+        + 
"{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+        + 
"{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+        + 
"{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+        + 
"{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+        + 
"{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+        + "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+        + "\"value\":\"string\",\"value-required\":true}}]},"
+        + "\"projection\":{\"type\":\"struct\",\"schema-id\":0,"
+        + 
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+        + 
"{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+        + 
"{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+        + 
"{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+        + 
"{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+        + 
"{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+        + "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+        + "\"value\":\"string\",\"value-required\":true}}]},"
+        + "\"metadata-file\":{\"spec-id\":0,\"content\":\"DATA\","
+        + "\"file-path\":\"/tmp/metadata2.json\","
+        + "\"file-format\":\"METADATA\",\"partition\":{},"
+        + "\"file-size-in-bytes\":0,\"record-count\":2,\"sort-order-id\":0},"
+        + 
"\"rows\":[{\"1\":\"2009-02-13T23:31:30+00:00\",\"2\":1,\"4\":\"append\","
+        + "\"5\":\"file:/tmp/manifest1.avro\","
+        + 
"\"6\":{\"keys\":[\"added-data-files\",\"added-records\",\"added-files-size\",\"changed-partition-count\","
+        + 
"\"total-records\",\"total-files-size\",\"total-data-files\",\"total-delete-files\","
+        + "\"total-position-deletes\",\"total-equality-deletes\"],"
+        + 
"\"values\":[\"1\",\"1\",\"10\",\"1\",\"1\",\"10\",\"1\",\"0\",\"0\",\"0\"]}},"
+        + 
"{\"1\":\"2282-12-22T20:13:30+00:00\",\"2\":2,\"3\":1,\"4\":\"append\","
+        + "\"5\":\"file:/tmp/manifest2.avro\","
+        + 
"\"6\":{\"keys\":[\"added-data-files\",\"added-records\",\"added-files-size\",\"changed-partition-count\","
+        + 
"\"total-records\",\"total-files-size\",\"total-data-files\",\"total-delete-files\","
+        + "\"total-position-deletes\",\"total-equality-deletes\"],"
+        + 
"\"values\":[\"1\",\"1\",\"10\",\"1\",\"2\",\"20\",\"2\",\"0\",\"0\",\"0\"]}}]}";
+  }
+
+  private void assertDataTaskEquals(StaticDataTask expected, StaticDataTask 
actual) {
+    assertThat(actual.schema().asStruct())
+        .as("Schema should match")
+        .isEqualTo(expected.schema().asStruct());
+
+    assertThat(actual.projectedSchema().asStruct())
+        .as("Projected schema should match")
+        .isEqualTo(expected.projectedSchema().asStruct());
+
+    TestContentFileParser.assertContentFileEquals(
+        expected.metadataFile(), actual.metadataFile(), 
PartitionSpec.unpartitioned());
+
+    List<StructLike> expectedRows = Lists.newArrayList(expected.rows());
+    List<StructLike> actualRows = Lists.newArrayList(actual.rows());
+    assertThat(actualRows).hasSameSizeAs(expectedRows);
+
+    // all fields are primitive types or map
+    Schema schema = expected.schema();
+    for (int i = 0; i < expectedRows.size(); ++i) {
+      StructLike expectedRow = expectedRows.get(i);
+      StructLike actualRow = actualRows.get(i);
+      for (int pos = 0; pos < expectedRow.size(); ++pos) {
+        Class<?> javaClass = 
schema.columns().get(pos).type().typeId().javaClass();
+        assertThat(actualRow.get(pos, 
javaClass)).isEqualTo(expectedRow.get(pos, javaClass));
+      }
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java 
b/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
index 6e274c4811..137e789738 100644
--- a/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
+++ b/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
@@ -35,23 +35,64 @@ public class TestFileScanTaskParser {
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Invalid file scan task: null");
 
-    assertThatThrownBy(() -> FileScanTaskParser.fromJson(null, true))
+    assertThatThrownBy(() -> FileScanTaskParser.fromJson((String) null, true))
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Invalid JSON string for file scan task: null");
+
+    assertThatThrownBy(() -> ScanTaskParser.toJson(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid scan task: null");
+
+    assertThatThrownBy(() -> ScanTaskParser.fromJson(null, true))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON string for scan task: null");
   }
 
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
-  public void testParser(boolean caseSensitive) {
+  public void testFileScanTaskParser(boolean caseSensitive) {
     PartitionSpec spec = TestBase.SPEC;
-    FileScanTask fileScanTask = createScanTask(spec, caseSensitive);
+    FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive);
     String jsonStr = FileScanTaskParser.toJson(fileScanTask);
-    assertThat(jsonStr).isEqualTo(expectedFileScanTaskJson());
+    assertThat(jsonStr).isEqualTo(fileScanTaskJsonWithoutTaskType());
     FileScanTask deserializedTask = FileScanTaskParser.fromJson(jsonStr, 
caseSensitive);
     assertFileScanTaskEquals(fileScanTask, deserializedTask, spec, 
caseSensitive);
   }
 
-  private FileScanTask createScanTask(PartitionSpec spec, boolean 
caseSensitive) {
+  /** Test backward compatibility where task-type field is absent from the 
JSON string */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testFileScanTaskParserWithoutTaskTypeField(boolean 
caseSensitive) {
+    PartitionSpec spec = TestBase.SPEC;
+    FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive);
+    FileScanTask deserializedTask =
+        FileScanTaskParser.fromJson(fileScanTaskJsonWithoutTaskType(), 
caseSensitive);
+    assertFileScanTaskEquals(fileScanTask, deserializedTask, spec, 
caseSensitive);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testScanTaskParser(boolean caseSensitive) {
+    PartitionSpec spec = TestBase.SPEC;
+    FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive);
+    String jsonStr = ScanTaskParser.toJson(fileScanTask);
+    assertThat(jsonStr).isEqualTo(fileScanTaskJson());
+    FileScanTask deserializedTask = ScanTaskParser.fromJson(jsonStr, 
caseSensitive);
+    assertFileScanTaskEquals(fileScanTask, deserializedTask, spec, 
caseSensitive);
+  }
+
+  /** Test backward compatibility where task-type field is absent from the 
JSON string */
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testScanTaskParserWithoutTaskTypeField(boolean caseSensitive) {
+    PartitionSpec spec = TestBase.SPEC;
+    FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive);
+    FileScanTask deserializedTask =
+        ScanTaskParser.fromJson(fileScanTaskJsonWithoutTaskType(), 
caseSensitive);
+    assertFileScanTaskEquals(fileScanTask, deserializedTask, spec, 
caseSensitive);
+  }
+
+  private FileScanTask createFileScanTask(PartitionSpec spec, boolean 
caseSensitive) {
     ResidualEvaluator residualEvaluator;
     if (spec.isUnpartitioned()) {
       residualEvaluator = 
ResidualEvaluator.unpartitioned(Expressions.alwaysTrue());
@@ -67,7 +108,7 @@ public class TestFileScanTaskParser {
         residualEvaluator);
   }
 
-  private String expectedFileScanTaskJson() {
+  private String fileScanTaskJsonWithoutTaskType() {
     return "{\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":["
         + "{\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"},"
         + 
"{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]},"
@@ -86,6 +127,26 @@ public class TestFileScanTaskParser {
         + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}";
   }
 
+  private String fileScanTaskJson() {
+    return "{\"task-type\":\"file-scan-task\","
+        + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":["
+        + "{\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"},"
+        + 
"{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]},"
+        + "\"spec\":{\"spec-id\":0,\"fields\":[{\"name\":\"data_bucket\","
+        + "\"transform\":\"bucket[16]\",\"source-id\":4,\"field-id\":1000}]},"
+        + 
"\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\","
+        + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0},"
+        + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0},"
+        + "\"start\":0,\"length\":10,"
+        + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\","
+        + 
"\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\","
+        + 
"\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1},"
+        + 
"{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/data-a2-deletes.parquet\","
+        + 
"\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,"
+        + "\"record-count\":1,\"equality-ids\":[1],\"sort-order-id\":0}],"
+        + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}";
+  }
+
   private static void assertFileScanTaskEquals(
       FileScanTask expected, FileScanTask actual, PartitionSpec spec, boolean 
caseSensitive) {
     TestContentFileParser.assertContentFileEquals(expected.file(), 
actual.file(), spec);
@@ -95,7 +156,7 @@ public class TestFileScanTaskParser {
           expected.deletes().get(pos), actual.deletes().get(pos), spec);
     }
 
-    assertThat(expected.schema().sameSchema(actual.schema())).as("Schema 
should match").isTrue();
+    
assertThat(actual.schema().asStruct()).isEqualTo(expected.schema().asStruct());
     assertThat(actual.spec()).isEqualTo(expected.spec());
     assertThat(
             ExpressionUtil.equivalent(
diff --git a/core/src/test/java/org/apache/iceberg/TestScanTaskParser.java 
b/core/src/test/java/org/apache/iceberg/TestScanTaskParser.java
new file mode 100644
index 0000000000..aad8751498
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestScanTaskParser.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestScanTaskParser {
+  @Test
+  public void nullCheck() {
+    assertThatThrownBy(() -> ScanTaskParser.toJson(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid scan task: null");
+
+    assertThatThrownBy(() -> ScanTaskParser.fromJson(null, true))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON string for scan task: null");
+  }
+
+  @Test
+  public void invalidTaskType() {
+    String jsonStr = "{\"task-type\":\"junk\"}";
+    assertThatThrownBy(() -> ScanTaskParser.fromJson(jsonStr, true))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Unknown task type: junk");
+  }
+
+  @Test
+  public void unsupportedTask() {
+    FileScanTask mockTask = Mockito.mock(FileScanTask.class);
+    assertThatThrownBy(() -> ScanTaskParser.toJson(mockTask))
+        .isInstanceOf(UnsupportedOperationException.class)
+        .hasMessageContaining(
+            "Unsupported task type: 
org.apache.iceberg.FileScanTask$MockitoMock$");
+  }
+}
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
index 44e37afcfc..344f64833b 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -32,7 +32,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.iceberg.BaseCombinedScanTask;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.FileScanTaskParser;
+import org.apache.iceberg.ScanTaskParser;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -154,7 +154,7 @@ public class IcebergSourceSplit implements SourceSplit, 
Serializable {
       out.writeInt(fileScanTasks.size());
 
       for (FileScanTask fileScanTask : fileScanTasks) {
-        String taskJson = FileScanTaskParser.toJson(fileScanTask);
+        String taskJson = ScanTaskParser.toJson(fileScanTask);
         writeTaskJson(out, taskJson, version);
       }
 
@@ -199,7 +199,7 @@ public class IcebergSourceSplit implements SourceSplit, 
Serializable {
     List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
     for (int i = 0; i < taskCount; ++i) {
       String taskJson = readTaskJson(in, version);
-      FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
+      FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive);
       tasks.add(task);
     }
 
diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
index 44e37afcfc..344f64833b 100644
--- 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -32,7 +32,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.iceberg.BaseCombinedScanTask;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.FileScanTaskParser;
+import org.apache.iceberg.ScanTaskParser;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -154,7 +154,7 @@ public class IcebergSourceSplit implements SourceSplit, 
Serializable {
       out.writeInt(fileScanTasks.size());
 
       for (FileScanTask fileScanTask : fileScanTasks) {
-        String taskJson = FileScanTaskParser.toJson(fileScanTask);
+        String taskJson = ScanTaskParser.toJson(fileScanTask);
         writeTaskJson(out, taskJson, version);
       }
 
@@ -199,7 +199,7 @@ public class IcebergSourceSplit implements SourceSplit, 
Serializable {
     List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
     for (int i = 0; i < taskCount; ++i) {
       String taskJson = readTaskJson(in, version);
-      FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
+      FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive);
       tasks.add(task);
     }
 
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
index 44e37afcfc..344f64833b 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -32,7 +32,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.iceberg.BaseCombinedScanTask;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.FileScanTaskParser;
+import org.apache.iceberg.ScanTaskParser;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -154,7 +154,7 @@ public class IcebergSourceSplit implements SourceSplit, 
Serializable {
       out.writeInt(fileScanTasks.size());
 
       for (FileScanTask fileScanTask : fileScanTasks) {
-        String taskJson = FileScanTaskParser.toJson(fileScanTask);
+        String taskJson = ScanTaskParser.toJson(fileScanTask);
         writeTaskJson(out, taskJson, version);
       }
 
@@ -199,7 +199,7 @@ public class IcebergSourceSplit implements SourceSplit, 
Serializable {
     List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
     for (int i = 0; i < taskCount; ++i) {
       String taskJson = readTaskJson(in, version);
-      FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
+      FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive);
       tasks.add(task);
     }
 


Reply via email to