This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 9ed33839e7 Core, Flink: Add task-type field to JSON serde of scan task
/ Add JSON serde for StaticDataTask. (#9728)
9ed33839e7 is described below
commit 9ed33839e7a8820b5069cbb51cd54a80785d6a62
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Wed Jun 26 16:37:37 2024 +0800
Core, Flink: Add task-type field to JSON serde of scan task / Add JSON
serde for StaticDataTask. (#9728)
---
.../java/org/apache/iceberg/DataTaskParser.java | 81 ++++++
.../org/apache/iceberg/FileScanTaskParser.java | 39 ++-
.../java/org/apache/iceberg/ScanTaskParser.java | 105 ++++++++
.../java/org/apache/iceberg/StaticDataTask.java | 26 ++
.../org/apache/iceberg/TestDataTaskParser.java | 274 +++++++++++++++++++++
.../org/apache/iceberg/TestFileScanTaskParser.java | 75 +++++-
.../org/apache/iceberg/TestScanTaskParser.java | 54 ++++
.../flink/source/split/IcebergSourceSplit.java | 6 +-
.../flink/source/split/IcebergSourceSplit.java | 6 +-
.../flink/source/split/IcebergSourceSplit.java | 6 +-
10 files changed, 644 insertions(+), 28 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/DataTaskParser.java
b/core/src/main/java/org/apache/iceberg/DataTaskParser.java
new file mode 100644
index 0000000000..428bcf15e7
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/DataTaskParser.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class DataTaskParser {
+ private static final String SCHEMA = "schema";
+ private static final String PROJECTED_SCHEMA = "projection";
+ private static final String METADATA_FILE = "metadata-file";
+ private static final String ROWS = "rows";
+
+ private DataTaskParser() {}
+
+ static void toJson(StaticDataTask dataTask, JsonGenerator generator) throws
IOException {
+ Preconditions.checkArgument(dataTask != null, "Invalid data task: null");
+ Preconditions.checkArgument(generator != null, "Invalid JSON generator:
null");
+
+ generator.writeFieldName(SCHEMA);
+ SchemaParser.toJson(dataTask.schema(), generator);
+
+ generator.writeFieldName(PROJECTED_SCHEMA);
+ SchemaParser.toJson(dataTask.projectedSchema(), generator);
+
+ generator.writeFieldName(METADATA_FILE);
+ ContentFileParser.toJson(dataTask.metadataFile(),
PartitionSpec.unpartitioned(), generator);
+
+ Preconditions.checkArgument(dataTask.tableRows() != null, "Invalid data
task: null table rows");
+ generator.writeArrayFieldStart(ROWS);
+ for (StructLike row : dataTask.tableRows()) {
+ SingleValueParser.toJson(dataTask.schema().asStruct(), row, generator);
+ }
+
+ generator.writeEndArray();
+ }
+
+ static StaticDataTask fromJson(JsonNode jsonNode) {
+ Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for data
task: null");
+ Preconditions.checkArgument(
+ jsonNode.isObject(), "Invalid JSON node for data task: non-object
(%s)", jsonNode);
+
+ Schema schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode));
+ Schema projectedSchema =
SchemaParser.fromJson(JsonUtil.get(PROJECTED_SCHEMA, jsonNode));
+ DataFile metadataFile =
+ (DataFile)
+ ContentFileParser.fromJson(
+ JsonUtil.get(METADATA_FILE, jsonNode),
PartitionSpec.unpartitioned());
+
+ JsonNode rowsArray = JsonUtil.get(ROWS, jsonNode);
+ Preconditions.checkArgument(
+ rowsArray.isArray(), "Invalid JSON node for rows: non-array (%s)",
rowsArray);
+
+ StructLike[] rows = new StructLike[rowsArray.size()];
+ for (int i = 0; i < rowsArray.size(); ++i) {
+ JsonNode rowNode = rowsArray.get(i);
+ rows[i] = (StructLike) SingleValueParser.fromJson(schema.asStruct(),
rowNode);
+ }
+
+ return new StaticDataTask(metadataFile, schema, projectedSchema, rows);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
b/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
index 0a708f2668..a6ea41319f 100644
--- a/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
+++ b/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
@@ -40,16 +40,38 @@ public class FileScanTaskParser {
private FileScanTaskParser() {}
+ /**
+ * Serialize file scan task to JSON string
+ *
+ * @deprecated will be removed in 1.7.0; use {@link
ScanTaskParser#toJson(FileScanTask)} instead
+ */
+ @Deprecated
public static String toJson(FileScanTask fileScanTask) {
+ Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task:
null");
return JsonUtil.generate(
- generator -> FileScanTaskParser.toJson(fileScanTask, generator),
false);
+ generator -> {
+ generator.writeStartObject();
+ toJson(fileScanTask, generator);
+ generator.writeEndObject();
+ },
+ false);
+ }
+
+ /**
+ * Deserialize file scan task from JSON string
+ *
+ * @deprecated will be removed in 1.7.0; use {@link
ScanTaskParser#fromJson(String, boolean)}
+ * instead
+ */
+ @Deprecated
+ public static FileScanTask fromJson(String json, boolean caseSensitive) {
+ Preconditions.checkArgument(json != null, "Invalid JSON string for file
scan task: null");
+ return JsonUtil.parse(json, node -> fromJson(node, caseSensitive));
}
- private static void toJson(FileScanTask fileScanTask, JsonGenerator
generator)
- throws IOException {
+ static void toJson(FileScanTask fileScanTask, JsonGenerator generator)
throws IOException {
Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task:
null");
Preconditions.checkArgument(generator != null, "Invalid JSON generator:
null");
- generator.writeStartObject();
generator.writeFieldName(SCHEMA);
SchemaParser.toJson(fileScanTask.schema(), generator);
@@ -78,16 +100,9 @@ public class FileScanTaskParser {
generator.writeFieldName(RESIDUAL);
ExpressionParser.toJson(fileScanTask.residual(), generator);
}
-
- generator.writeEndObject();
- }
-
- public static FileScanTask fromJson(String json, boolean caseSensitive) {
- Preconditions.checkArgument(json != null, "Invalid JSON string for file
scan task: null");
- return JsonUtil.parse(json, node -> FileScanTaskParser.fromJson(node,
caseSensitive));
}
- private static FileScanTask fromJson(JsonNode jsonNode, boolean
caseSensitive) {
+ static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) {
Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file
scan task: null");
Preconditions.checkArgument(
jsonNode.isObject(), "Invalid JSON node for file scan task: non-object
(%s)", jsonNode);
diff --git a/core/src/main/java/org/apache/iceberg/ScanTaskParser.java
b/core/src/main/java/org/apache/iceberg/ScanTaskParser.java
new file mode 100644
index 0000000000..9447d0668a
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ScanTaskParser.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.util.JsonUtil;
+
+public class ScanTaskParser {
+ private static final String TASK_TYPE = "task-type";
+
+ private enum TaskType {
+ FILE_SCAN_TASK("file-scan-task"),
+ DATA_TASK("data-task");
+
+ private final String value;
+
+ TaskType(String value) {
+ this.value = value;
+ }
+
+ public static TaskType fromTypeName(String value) {
+ Preconditions.checkArgument(
+ !Strings.isNullOrEmpty(value), "Invalid task type name: null or
empty");
+ if (FILE_SCAN_TASK.typeName().equalsIgnoreCase(value)) {
+ return FILE_SCAN_TASK;
+ } else if (DATA_TASK.typeName().equalsIgnoreCase(value)) {
+ return DATA_TASK;
+ } else {
+ throw new IllegalArgumentException("Unknown task type: " + value);
+ }
+ }
+
+ public String typeName() {
+ return value;
+ }
+ }
+
+ private ScanTaskParser() {}
+
+ public static String toJson(FileScanTask fileScanTask) {
+ Preconditions.checkArgument(fileScanTask != null, "Invalid scan task:
null");
+ return JsonUtil.generate(generator -> toJson(fileScanTask, generator),
false);
+ }
+
+ public static FileScanTask fromJson(String json, boolean caseSensitive) {
+ Preconditions.checkArgument(json != null, "Invalid JSON string for scan
task: null");
+ return JsonUtil.parse(json, node -> fromJson(node, caseSensitive));
+ }
+
+ private static void toJson(FileScanTask fileScanTask, JsonGenerator
generator)
+ throws IOException {
+ generator.writeStartObject();
+
+ if (fileScanTask instanceof StaticDataTask) {
+ generator.writeStringField(TASK_TYPE, TaskType.DATA_TASK.typeName());
+ DataTaskParser.toJson((StaticDataTask) fileScanTask, generator);
+ } else if (fileScanTask instanceof BaseFileScanTask
+ || fileScanTask instanceof BaseFileScanTask.SplitScanTask) {
+ generator.writeStringField(TASK_TYPE,
TaskType.FILE_SCAN_TASK.typeName());
+ FileScanTaskParser.toJson(fileScanTask, generator);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported task type: " +
fileScanTask.getClass().getCanonicalName());
+ }
+
+ generator.writeEndObject();
+ }
+
+ private static FileScanTask fromJson(JsonNode jsonNode, boolean
caseSensitive) {
+ TaskType taskType = TaskType.FILE_SCAN_TASK;
+ String taskTypeStr = JsonUtil.getStringOrNull(TASK_TYPE, jsonNode);
+ if (null != taskTypeStr) {
+ taskType = TaskType.fromTypeName(taskTypeStr);
+ }
+
+ switch (taskType) {
+ case FILE_SCAN_TASK:
+ return FileScanTaskParser.fromJson(jsonNode, caseSensitive);
+ case DATA_TASK:
+ return DataTaskParser.fromJson(jsonNode);
+ default:
+ throw new UnsupportedOperationException("Unsupported task type: " +
taskType.typeName());
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/StaticDataTask.java
b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
index cffb424279..f25ebd49c9 100644
--- a/core/src/main/java/org/apache/iceberg/StaticDataTask.java
+++ b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
@@ -64,6 +64,19 @@ class StaticDataTask implements DataTask {
this.rows = rows;
}
+ StaticDataTask(
+ DataFile metadataFile, Schema tableSchema, Schema projectedSchema,
StructLike[] rows) {
+ this.tableSchema = tableSchema;
+ this.projectedSchema = projectedSchema;
+ this.metadataFile = metadataFile;
+ this.rows = rows;
+ }
+
+ @Override
+ public Schema schema() {
+ return tableSchema;
+ }
+
@Override
public List<DeleteFile> deletes() {
return ImmutableList.of();
@@ -106,6 +119,19 @@ class StaticDataTask implements DataTask {
return ImmutableList.of(this);
}
+ Schema projectedSchema() {
+ return projectedSchema;
+ }
+
+ DataFile metadataFile() {
+ return metadataFile;
+ }
+
+ /** @return the table rows before projection */
+ StructLike[] tableRows() {
+ return rows;
+ }
+
/** Implements {@link StructLike#get} for passing static rows. */
static class Row implements StructLike, Serializable {
public static Row of(Object... values) {
diff --git a/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java
b/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java
new file mode 100644
index 0000000000..5a3d119046
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestDataTaskParser.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.JsonUtil;
+import org.junit.jupiter.api.Test;
+
+public class TestDataTaskParser {
+ // copied from SnapshotsTable to avoid making it package public
+ private static final Schema SNAPSHOT_SCHEMA =
+ new Schema(
+ Types.NestedField.required(1, "committed_at",
Types.TimestampType.withZone()),
+ Types.NestedField.required(2, "snapshot_id", Types.LongType.get()),
+ Types.NestedField.optional(3, "parent_id", Types.LongType.get()),
+ Types.NestedField.optional(4, "operation", Types.StringType.get()),
+ Types.NestedField.optional(5, "manifest_list",
Types.StringType.get()),
+ Types.NestedField.optional(
+ 6,
+ "summary",
+ Types.MapType.ofRequired(7, 8, Types.StringType.get(),
Types.StringType.get())));
+
+ // copied from SnapshotsTable to avoid making it package public
+ private static StaticDataTask.Row snapshotToRow(Snapshot snap) {
+ return StaticDataTask.Row.of(
+ snap.timestampMillis() * 1000,
+ snap.snapshotId(),
+ snap.parentId(),
+ snap.operation(),
+ snap.manifestListLocation(),
+ snap.summary());
+ }
+
+ @Test
+ public void nullCheck() throws Exception {
+ StringWriter writer = new StringWriter();
+ JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+
+ assertThatThrownBy(() -> DataTaskParser.toJson(null, generator))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid data task: null");
+
+ assertThatThrownBy(() -> DataTaskParser.toJson((StaticDataTask)
createDataTask(), null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid JSON generator: null");
+
+ assertThatThrownBy(() -> DataTaskParser.fromJson(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid JSON node for data task: null");
+ }
+
+ @Test
+ public void invalidJsonNode() throws Exception {
+ String jsonStr = "{\"str\":\"1\", \"arr\":[]}";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode rootNode = mapper.reader().readTree(jsonStr);
+
+ assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("str")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid JSON node for data task: non-object ");
+
+ assertThatThrownBy(() -> DataTaskParser.fromJson(rootNode.get("arr")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid JSON node for data task: non-object ");
+ }
+
+ @Test
+ public void missingFields() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+
+ String missingSchemaStr = "{}";
+ JsonNode missingSchemaNode = mapper.reader().readTree(missingSchemaStr);
+ assertThatThrownBy(() -> DataTaskParser.fromJson(missingSchemaNode))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot parse missing field: schema");
+
+ String missingProjectionStr =
+ "{"
+ + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+ +
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}"
+ + "}";
+ JsonNode missingProjectionNode =
mapper.reader().readTree(missingProjectionStr);
+ assertThatThrownBy(() -> DataTaskParser.fromJson(missingProjectionNode))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot parse missing field: projection");
+
+ String missingMetadataFileStr =
+ "{"
+ + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+ +
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]},"
+ + "\"projection\":{\"type\":\"struct\",\"schema-id\":0,"
+ +
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]}"
+ + "}";
+ JsonNode missingMetadataFileNode =
mapper.reader().readTree(missingMetadataFileStr);
+ assertThatThrownBy(() -> DataTaskParser.fromJson(missingMetadataFileNode))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot parse missing field: metadata-file");
+
+ String missingTableRowsStr =
+ "{\"task-type\":\"data-task\","
+ + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+ +
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+ +
"{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+ +
"{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+ +
"{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+ +
"{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+ +
"{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+ + "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+ + "\"value\":\"string\",\"value-required\":true}}]},"
+ + "\"projection\":{\"type\":\"struct\",\"schema-id\":0,"
+ +
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+ +
"{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+ +
"{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+ +
"{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+ +
"{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+ +
"{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+ + "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+ + "\"value\":\"string\",\"value-required\":true}}]},"
+ + "\"metadata-file\":{\"spec-id\":0,\"content\":\"DATA\","
+ + "\"file-path\":\"/tmp/metadata2.json\","
+ + "\"file-format\":\"METADATA\",\"partition\":{},"
+ +
"\"file-size-in-bytes\":0,\"record-count\":2,\"sort-order-id\":0}"
+ + "}";
+ JsonNode missingTableRowsNode =
mapper.reader().readTree(missingTableRowsStr);
+ assertThatThrownBy(() -> DataTaskParser.fromJson(missingTableRowsNode))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot parse missing field: rows");
+ }
+
+ @Test
+ public void roundTripSerde() {
+ StaticDataTask dataTask = (StaticDataTask) createDataTask();
+ String jsonStr = ScanTaskParser.toJson(dataTask);
+ assertThat(jsonStr).isEqualTo(snapshotsDataTaskJson());
+ StaticDataTask deserializedTask = (StaticDataTask)
ScanTaskParser.fromJson(jsonStr, true);
+ assertDataTaskEquals(dataTask, deserializedTask);
+ }
+
+ private DataTask createDataTask() {
+ Map<String, String> summary1 =
+ ImmutableMap.of(
+ "added-data-files", "1",
+ "added-records", "1",
+ "added-files-size", "10",
+ "changed-partition-count", "1",
+ "total-records", "1",
+ "total-files-size", "10",
+ "total-data-files", "1",
+ "total-delete-files", "0",
+ "total-position-deletes", "0",
+ "total-equality-deletes", "0");
+
+ Map<String, String> summary2 =
+ ImmutableMap.of(
+ "added-data-files", "1",
+ "added-records", "1",
+ "added-files-size", "10",
+ "changed-partition-count", "1",
+ "total-records", "2",
+ "total-files-size", "20",
+ "total-data-files", "2",
+ "total-delete-files", "0",
+ "total-position-deletes", "0",
+ "total-equality-deletes", "0");
+
+ List<Snapshot> snapshots =
+ Arrays.asList(
+ new BaseSnapshot(
+ 1L, 1L, null, 1234567890000L, "append", summary1, 1,
"file:/tmp/manifest1.avro"),
+ new BaseSnapshot(
+ 2L, 2L, 1L, 9876543210000L, "append", summary2, 1,
"file:/tmp/manifest2.avro"));
+
+ return StaticDataTask.of(
+ Files.localInput("file:/tmp/metadata2.json"),
+ SNAPSHOT_SCHEMA,
+ SNAPSHOT_SCHEMA,
+ snapshots,
+ TestDataTaskParser::snapshotToRow);
+ }
+
+ private String snapshotsDataTaskJson() {
+ return "{\"task-type\":\"data-task\","
+ + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,"
+ +
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+ +
"{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+ +
"{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+ +
"{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+ +
"{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+ +
"{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+ + "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+ + "\"value\":\"string\",\"value-required\":true}}]},"
+ + "\"projection\":{\"type\":\"struct\",\"schema-id\":0,"
+ +
"\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"},"
+ +
"{\"id\":2,\"name\":\"snapshot_id\",\"required\":true,\"type\":\"long\"},"
+ +
"{\"id\":3,\"name\":\"parent_id\",\"required\":false,\"type\":\"long\"},"
+ +
"{\"id\":4,\"name\":\"operation\",\"required\":false,\"type\":\"string\"},"
+ +
"{\"id\":5,\"name\":\"manifest_list\",\"required\":false,\"type\":\"string\"},"
+ +
"{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+ + "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+ + "\"value\":\"string\",\"value-required\":true}}]},"
+ + "\"metadata-file\":{\"spec-id\":0,\"content\":\"DATA\","
+ + "\"file-path\":\"/tmp/metadata2.json\","
+ + "\"file-format\":\"METADATA\",\"partition\":{},"
+ + "\"file-size-in-bytes\":0,\"record-count\":2,\"sort-order-id\":0},"
+ +
"\"rows\":[{\"1\":\"2009-02-13T23:31:30+00:00\",\"2\":1,\"4\":\"append\","
+ + "\"5\":\"file:/tmp/manifest1.avro\","
+ +
"\"6\":{\"keys\":[\"added-data-files\",\"added-records\",\"added-files-size\",\"changed-partition-count\","
+ +
"\"total-records\",\"total-files-size\",\"total-data-files\",\"total-delete-files\","
+ + "\"total-position-deletes\",\"total-equality-deletes\"],"
+ +
"\"values\":[\"1\",\"1\",\"10\",\"1\",\"1\",\"10\",\"1\",\"0\",\"0\",\"0\"]}},"
+ +
"{\"1\":\"2282-12-22T20:13:30+00:00\",\"2\":2,\"3\":1,\"4\":\"append\","
+ + "\"5\":\"file:/tmp/manifest2.avro\","
+ +
"\"6\":{\"keys\":[\"added-data-files\",\"added-records\",\"added-files-size\",\"changed-partition-count\","
+ +
"\"total-records\",\"total-files-size\",\"total-data-files\",\"total-delete-files\","
+ + "\"total-position-deletes\",\"total-equality-deletes\"],"
+ +
"\"values\":[\"1\",\"1\",\"10\",\"1\",\"2\",\"20\",\"2\",\"0\",\"0\",\"0\"]}}]}";
+ }
+
+ private void assertDataTaskEquals(StaticDataTask expected, StaticDataTask
actual) {
+ assertThat(actual.schema().asStruct())
+ .as("Schema should match")
+ .isEqualTo(expected.schema().asStruct());
+
+ assertThat(actual.projectedSchema().asStruct())
+ .as("Projected schema should match")
+ .isEqualTo(expected.projectedSchema().asStruct());
+
+ TestContentFileParser.assertContentFileEquals(
+ expected.metadataFile(), actual.metadataFile(),
PartitionSpec.unpartitioned());
+
+ List<StructLike> expectedRows = Lists.newArrayList(expected.rows());
+ List<StructLike> actualRows = Lists.newArrayList(actual.rows());
+ assertThat(actualRows).hasSameSizeAs(expectedRows);
+
+ // all fields are primitive types or map
+ Schema schema = expected.schema();
+ for (int i = 0; i < expectedRows.size(); ++i) {
+ StructLike expectedRow = expectedRows.get(i);
+ StructLike actualRow = actualRows.get(i);
+ for (int pos = 0; pos < expectedRow.size(); ++pos) {
+ Class<?> javaClass =
schema.columns().get(pos).type().typeId().javaClass();
+ assertThat(actualRow.get(pos,
javaClass)).isEqualTo(expectedRow.get(pos, javaClass));
+ }
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
b/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
index 6e274c4811..137e789738 100644
--- a/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
+++ b/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
@@ -35,23 +35,64 @@ public class TestFileScanTaskParser {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid file scan task: null");
- assertThatThrownBy(() -> FileScanTaskParser.fromJson(null, true))
+ assertThatThrownBy(() -> FileScanTaskParser.fromJson((String) null, true))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid JSON string for file scan task: null");
+
+ assertThatThrownBy(() -> ScanTaskParser.toJson(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid scan task: null");
+
+ assertThatThrownBy(() -> ScanTaskParser.fromJson(null, true))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid JSON string for scan task: null");
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testParser(boolean caseSensitive) {
+ public void testFileScanTaskParser(boolean caseSensitive) {
PartitionSpec spec = TestBase.SPEC;
- FileScanTask fileScanTask = createScanTask(spec, caseSensitive);
+ FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive);
String jsonStr = FileScanTaskParser.toJson(fileScanTask);
- assertThat(jsonStr).isEqualTo(expectedFileScanTaskJson());
+ assertThat(jsonStr).isEqualTo(fileScanTaskJsonWithoutTaskType());
FileScanTask deserializedTask = FileScanTaskParser.fromJson(jsonStr,
caseSensitive);
assertFileScanTaskEquals(fileScanTask, deserializedTask, spec,
caseSensitive);
}
- private FileScanTask createScanTask(PartitionSpec spec, boolean
caseSensitive) {
+ /** Test backward compatibility where task-type field is absent from the
JSON string */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testFileScanTaskParserWithoutTaskTypeField(boolean
caseSensitive) {
+ PartitionSpec spec = TestBase.SPEC;
+ FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive);
+ FileScanTask deserializedTask =
+ FileScanTaskParser.fromJson(fileScanTaskJsonWithoutTaskType(),
caseSensitive);
+ assertFileScanTaskEquals(fileScanTask, deserializedTask, spec,
caseSensitive);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testScanTaskParser(boolean caseSensitive) {
+ PartitionSpec spec = TestBase.SPEC;
+ FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive);
+ String jsonStr = ScanTaskParser.toJson(fileScanTask);
+ assertThat(jsonStr).isEqualTo(fileScanTaskJson());
+ FileScanTask deserializedTask = ScanTaskParser.fromJson(jsonStr,
caseSensitive);
+ assertFileScanTaskEquals(fileScanTask, deserializedTask, spec,
caseSensitive);
+ }
+
+ /** Test backward compatibility where task-type field is absent from the
JSON string */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testScanTaskParserWithoutTaskTypeField(boolean caseSensitive) {
+ PartitionSpec spec = TestBase.SPEC;
+ FileScanTask fileScanTask = createFileScanTask(spec, caseSensitive);
+ FileScanTask deserializedTask =
+ ScanTaskParser.fromJson(fileScanTaskJsonWithoutTaskType(),
caseSensitive);
+ assertFileScanTaskEquals(fileScanTask, deserializedTask, spec,
caseSensitive);
+ }
+
+ private FileScanTask createFileScanTask(PartitionSpec spec, boolean
caseSensitive) {
ResidualEvaluator residualEvaluator;
if (spec.isUnpartitioned()) {
residualEvaluator =
ResidualEvaluator.unpartitioned(Expressions.alwaysTrue());
@@ -67,7 +108,7 @@ public class TestFileScanTaskParser {
residualEvaluator);
}
- private String expectedFileScanTaskJson() {
+ private String fileScanTaskJsonWithoutTaskType() {
return "{\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":["
+ "{\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"},"
+
"{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]},"
@@ -86,6 +127,26 @@ public class TestFileScanTaskParser {
+ "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}";
}
+ private String fileScanTaskJson() {
+ return "{\"task-type\":\"file-scan-task\","
+ + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":["
+ + "{\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"},"
+ +
"{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]},"
+ + "\"spec\":{\"spec-id\":0,\"fields\":[{\"name\":\"data_bucket\","
+ + "\"transform\":\"bucket[16]\",\"source-id\":4,\"field-id\":1000}]},"
+ +
"\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\","
+ + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0},"
+ + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0},"
+ + "\"start\":0,\"length\":10,"
+ + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\","
+ +
"\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\","
+ +
"\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1},"
+ +
"{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/data-a2-deletes.parquet\","
+ +
"\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,"
+ + "\"record-count\":1,\"equality-ids\":[1],\"sort-order-id\":0}],"
+ + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}";
+ }
+
private static void assertFileScanTaskEquals(
FileScanTask expected, FileScanTask actual, PartitionSpec spec, boolean
caseSensitive) {
TestContentFileParser.assertContentFileEquals(expected.file(),
actual.file(), spec);
@@ -95,7 +156,7 @@ public class TestFileScanTaskParser {
expected.deletes().get(pos), actual.deletes().get(pos), spec);
}
- assertThat(expected.schema().sameSchema(actual.schema())).as("Schema
should match").isTrue();
+
assertThat(actual.schema().asStruct()).isEqualTo(expected.schema().asStruct());
assertThat(actual.spec()).isEqualTo(expected.spec());
assertThat(
ExpressionUtil.equivalent(
diff --git a/core/src/test/java/org/apache/iceberg/TestScanTaskParser.java
b/core/src/test/java/org/apache/iceberg/TestScanTaskParser.java
new file mode 100644
index 0000000000..aad8751498
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestScanTaskParser.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestScanTaskParser {
+ @Test
+ public void nullCheck() {
+ assertThatThrownBy(() -> ScanTaskParser.toJson(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid scan task: null");
+
+ assertThatThrownBy(() -> ScanTaskParser.fromJson(null, true))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid JSON string for scan task: null");
+ }
+
+ @Test
+ public void invalidTaskType() {
+ String jsonStr = "{\"task-type\":\"junk\"}";
+ assertThatThrownBy(() -> ScanTaskParser.fromJson(jsonStr, true))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Unknown task type: junk");
+ }
+
+ @Test
+ public void unsupportedTask() {
+ FileScanTask mockTask = Mockito.mock(FileScanTask.class);
+ assertThatThrownBy(() -> ScanTaskParser.toJson(mockTask))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining(
+ "Unsupported task type:
org.apache.iceberg.FileScanTask$MockitoMock$");
+ }
+}
diff --git
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
index 44e37afcfc..344f64833b 100644
---
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
+++
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -32,7 +32,7 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.FileScanTaskParser;
+import org.apache.iceberg.ScanTaskParser;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -154,7 +154,7 @@ public class IcebergSourceSplit implements SourceSplit,
Serializable {
out.writeInt(fileScanTasks.size());
for (FileScanTask fileScanTask : fileScanTasks) {
- String taskJson = FileScanTaskParser.toJson(fileScanTask);
+ String taskJson = ScanTaskParser.toJson(fileScanTask);
writeTaskJson(out, taskJson, version);
}
@@ -199,7 +199,7 @@ public class IcebergSourceSplit implements SourceSplit,
Serializable {
List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
for (int i = 0; i < taskCount; ++i) {
String taskJson = readTaskJson(in, version);
- FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
+ FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive);
tasks.add(task);
}
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
index 44e37afcfc..344f64833b 100644
---
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -32,7 +32,7 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.FileScanTaskParser;
+import org.apache.iceberg.ScanTaskParser;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -154,7 +154,7 @@ public class IcebergSourceSplit implements SourceSplit,
Serializable {
out.writeInt(fileScanTasks.size());
for (FileScanTask fileScanTask : fileScanTasks) {
- String taskJson = FileScanTaskParser.toJson(fileScanTask);
+ String taskJson = ScanTaskParser.toJson(fileScanTask);
writeTaskJson(out, taskJson, version);
}
@@ -199,7 +199,7 @@ public class IcebergSourceSplit implements SourceSplit,
Serializable {
List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
for (int i = 0; i < taskCount; ++i) {
String taskJson = readTaskJson(in, version);
- FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
+ FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive);
tasks.add(task);
}
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
index 44e37afcfc..344f64833b 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -32,7 +32,7 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.FileScanTaskParser;
+import org.apache.iceberg.ScanTaskParser;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -154,7 +154,7 @@ public class IcebergSourceSplit implements SourceSplit,
Serializable {
out.writeInt(fileScanTasks.size());
for (FileScanTask fileScanTask : fileScanTasks) {
- String taskJson = FileScanTaskParser.toJson(fileScanTask);
+ String taskJson = ScanTaskParser.toJson(fileScanTask);
writeTaskJson(out, taskJson, version);
}
@@ -199,7 +199,7 @@ public class IcebergSourceSplit implements SourceSplit,
Serializable {
List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
for (int i = 0; i < taskCount; ++i) {
String taskJson = readTaskJson(in, version);
- FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
+ FileScanTask task = ScanTaskParser.fromJson(taskJson, caseSensitive);
tasks.add(task);
}