This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b8db3f0042 Core: add JSON parser for ContentFile and FileScanTask
(#6934)
b8db3f0042 is described below
commit b8db3f00424c2cc2910a760b9068d38916e8c27a
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Mon Jun 26 09:02:19 2023 -0700
Core: add JSON parser for ContentFile and FileScanTask (#6934)
---
.../main/java/org/apache/iceberg/FileScanTask.java | 5 +
.../org/apache/iceberg/BaseContentScanTask.java | 15 +-
.../java/org/apache/iceberg/BaseFileScanTask.java | 5 +
.../java/org/apache/iceberg/ContentFileParser.java | 270 ++++++++++++++++
.../main/java/org/apache/iceberg/DataFiles.java | 13 +-
.../org/apache/iceberg/FileScanTaskParser.java | 126 ++++++++
.../java/org/apache/iceberg/GenericDataFile.java | 3 +-
.../java/org/apache/iceberg/util/JsonUtil.java | 35 ++-
.../java/org/apache/iceberg/TableTestBase.java | 2 +-
.../org/apache/iceberg/TestContentFileParser.java | 338 +++++++++++++++++++++
.../org/apache/iceberg/TestFileScanTaskParser.java | 108 +++++++
.../apache/iceberg/TestManifestWriterVersions.java | 2 +-
.../iceberg/catalog/TestTableIdentifierParser.java | 2 +-
.../rest/responses/TestListTablesResponse.java | 3 +-
.../java/org/apache/iceberg/util/TestJsonUtil.java | 80 ++++-
format/spec.md | 35 +++
16 files changed, 1026 insertions(+), 16 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/FileScanTask.java
b/api/src/main/java/org/apache/iceberg/FileScanTask.java
index d99d924370..5fb4b55459 100644
--- a/api/src/main/java/org/apache/iceberg/FileScanTask.java
+++ b/api/src/main/java/org/apache/iceberg/FileScanTask.java
@@ -29,6 +29,11 @@ public interface FileScanTask extends
ContentScanTask<DataFile>, SplittableScanT
*/
List<DeleteFile> deletes();
+ /** Return the schema for this file scan task. */
+ default Schema schema() {
+ throw new UnsupportedOperationException("Does not support schema getter");
+ }
+
@Override
default long sizeBytes() {
return length() +
deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
diff --git a/core/src/main/java/org/apache/iceberg/BaseContentScanTask.java
b/core/src/main/java/org/apache/iceberg/BaseContentScanTask.java
index e15b2b3f85..1521133c64 100644
--- a/core/src/main/java/org/apache/iceberg/BaseContentScanTask.java
+++ b/core/src/main/java/org/apache/iceberg/BaseContentScanTask.java
@@ -34,6 +34,7 @@ abstract class BaseContentScanTask<ThisT extends
ContentScanTask<F>, F extends C
private final String specString;
private final ResidualEvaluator residuals;
+ private transient volatile Schema schema = null;
private transient volatile PartitionSpec spec = null;
BaseContentScanTask(F file, String schemaString, String specString,
ResidualEvaluator residuals) {
@@ -52,12 +53,24 @@ abstract class BaseContentScanTask<ThisT extends
ContentScanTask<F>, F extends C
return file;
}
+ protected Schema schema() {
+ if (schema == null) {
+ synchronized (this) {
+ if (schema == null) {
+ this.schema = SchemaParser.fromJson(schemaString);
+ }
+ }
+ }
+
+ return schema;
+ }
+
@Override
public PartitionSpec spec() {
if (spec == null) {
synchronized (this) {
if (spec == null) {
- this.spec =
PartitionSpecParser.fromJson(SchemaParser.fromJson(schemaString), specString);
+ this.spec = PartitionSpecParser.fromJson(schema(), specString);
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
index 2d7258be71..bff2d724f7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
@@ -53,6 +53,11 @@ public class BaseFileScanTask extends
BaseContentScanTask<FileScanTask, DataFile
return ImmutableList.copyOf(deletes);
}
+ @Override
+ public Schema schema() {
+ return super.schema();
+ }
+
@VisibleForTesting
static final class SplitScanTask implements FileScanTask,
MergeableScanTask<SplitScanTask> {
private final long len;
diff --git a/core/src/main/java/org/apache/iceberg/ContentFileParser.java
b/core/src/main/java/org/apache/iceberg/ContentFileParser.java
new file mode 100644
index 0000000000..b3edf2927f
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ContentFileParser.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class ContentFileParser {
+ private static final String SPEC_ID = "spec-id";
+ private static final String CONTENT = "content";
+ private static final String FILE_PATH = "file-path";
+ private static final String FILE_FORMAT = "file-format";
+ private static final String PARTITION = "partition";
+ private static final String RECORD_COUNT = "record-count";
+ private static final String FILE_SIZE = "file-size-in-bytes";
+ private static final String COLUMN_SIZES = "column-sizes";
+ private static final String VALUE_COUNTS = "value-counts";
+ private static final String NULL_VALUE_COUNTS = "null-value-counts";
+ private static final String NAN_VALUE_COUNTS = "nan-value-counts";
+ private static final String LOWER_BOUNDS = "lower-bounds";
+ private static final String UPPER_BOUNDS = "upper-bounds";
+ private static final String KEY_METADATA = "key-metadata";
+ private static final String SPLIT_OFFSETS = "split-offsets";
+ private static final String EQUALITY_IDS = "equality-ids";
+ private static final String SORT_ORDER_ID = "sort-order-id";
+
+ private ContentFileParser() {}
+
+ private static boolean hasPartitionData(StructLike partitionData) {
+ return partitionData != null && partitionData.size() > 0;
+ }
+
+ static String toJson(ContentFile<?> contentFile, PartitionSpec spec) {
+ return JsonUtil.generate(
+ generator -> ContentFileParser.toJson(contentFile, spec, generator),
false);
+ }
+
+ static void toJson(ContentFile<?> contentFile, PartitionSpec spec,
JsonGenerator generator)
+ throws IOException {
+ Preconditions.checkArgument(contentFile != null, "Invalid content file:
null");
+ Preconditions.checkArgument(spec != null, "Invalid partition spec: null");
+ Preconditions.checkArgument(generator != null, "Invalid JSON generator:
null");
+ Preconditions.checkArgument(
+ contentFile.specId() == spec.specId(),
+ "Invalid partition spec id from content file: expected = %s, actual =
%s",
+ spec.specId(),
+ contentFile.specId());
+ Preconditions.checkArgument(
+ spec.isPartitioned() == hasPartitionData(contentFile.partition()),
+ "Invalid partition data from content file: expected = %s, actual = %s",
+ spec.isPartitioned() ? "partitioned" : "unpartitioned",
+ hasPartitionData(contentFile.partition()) ? "partitioned" :
"unpartitioned");
+
+ generator.writeStartObject();
+
+ // ignore the ordinal position (ContentFile#pos) of the file in a manifest,
+ // as it isn't used and BaseFile constructor doesn't support it.
+
+ generator.writeNumberField(SPEC_ID, contentFile.specId());
+ generator.writeStringField(CONTENT, contentFile.content().name());
+ generator.writeStringField(FILE_PATH, contentFile.path().toString());
+ generator.writeStringField(FILE_FORMAT, contentFile.format().name());
+
+ if (contentFile.partition() != null) {
+ generator.writeFieldName(PARTITION);
+ SingleValueParser.toJson(spec.partitionType(), contentFile.partition(),
generator);
+ }
+
+ generator.writeNumberField(FILE_SIZE, contentFile.fileSizeInBytes());
+
+ metricsToJson(contentFile, generator);
+
+ if (contentFile.keyMetadata() != null) {
+ generator.writeFieldName(KEY_METADATA);
+ SingleValueParser.toJson(DataFile.KEY_METADATA.type(),
contentFile.keyMetadata(), generator);
+ }
+
+ if (contentFile.splitOffsets() != null) {
+ JsonUtil.writeLongArray(SPLIT_OFFSETS, contentFile.splitOffsets(),
generator);
+ }
+
+ if (contentFile.equalityFieldIds() != null) {
+ JsonUtil.writeIntegerArray(EQUALITY_IDS, contentFile.equalityFieldIds(),
generator);
+ }
+
+ if (contentFile.sortOrderId() != null) {
+ generator.writeNumberField(SORT_ORDER_ID, contentFile.sortOrderId());
+ }
+
+ generator.writeEndObject();
+ }
+
+ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
+ Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for
content file: null");
+ Preconditions.checkArgument(
+ jsonNode.isObject(), "Invalid JSON node for content file: non-object
(%s)", jsonNode);
+ Preconditions.checkArgument(spec != null, "Invalid partition spec: null");
+
+ int specId = JsonUtil.getInt(SPEC_ID, jsonNode);
+ FileContent fileContent = FileContent.valueOf(JsonUtil.getString(CONTENT,
jsonNode));
+ String filePath = JsonUtil.getString(FILE_PATH, jsonNode);
+ FileFormat fileFormat =
FileFormat.fromString(JsonUtil.getString(FILE_FORMAT, jsonNode));
+
+ PartitionData partitionData = null;
+ if (jsonNode.has(PARTITION)) {
+ partitionData = new PartitionData(spec.partitionType());
+ StructLike structLike =
+ (StructLike) SingleValueParser.fromJson(spec.partitionType(),
jsonNode.get(PARTITION));
+ Preconditions.checkState(
+ partitionData.size() == structLike.size(),
+ "Invalid partition data size: expected = %s, actual = %s",
+ partitionData.size(),
+ structLike.size());
+ for (int pos = 0; pos < partitionData.size(); ++pos) {
+ Class<?> javaClass =
spec.partitionType().fields().get(pos).type().typeId().javaClass();
+ partitionData.set(pos, structLike.get(pos, javaClass));
+ }
+ }
+
+ long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE, jsonNode);
+ Metrics metrics = metricsFromJson(jsonNode);
+ ByteBuffer keyMetadata = JsonUtil.getByteBufferOrNull(KEY_METADATA,
jsonNode);
+ List<Long> splitOffsets = JsonUtil.getLongListOrNull(SPLIT_OFFSETS,
jsonNode);
+ int[] equalityFieldIds = JsonUtil.getIntArrayOrNull(EQUALITY_IDS,
jsonNode);
+ Integer sortOrderId = JsonUtil.getIntOrNull(SORT_ORDER_ID, jsonNode);
+
+ if (fileContent == FileContent.DATA) {
+ return new GenericDataFile(
+ specId,
+ filePath,
+ fileFormat,
+ partitionData,
+ fileSizeInBytes,
+ metrics,
+ keyMetadata,
+ splitOffsets,
+ equalityFieldIds,
+ sortOrderId);
+ } else {
+ return new GenericDeleteFile(
+ specId,
+ fileContent,
+ filePath,
+ fileFormat,
+ partitionData,
+ fileSizeInBytes,
+ metrics,
+ equalityFieldIds,
+ sortOrderId,
+ splitOffsets,
+ keyMetadata);
+ }
+ }
+
+ private static void metricsToJson(ContentFile<?> contentFile, JsonGenerator
generator)
+ throws IOException {
+ generator.writeNumberField(RECORD_COUNT, contentFile.recordCount());
+
+ if (contentFile.columnSizes() != null) {
+ generator.writeFieldName(COLUMN_SIZES);
+ SingleValueParser.toJson(DataFile.COLUMN_SIZES.type(),
contentFile.columnSizes(), generator);
+ }
+
+ if (contentFile.valueCounts() != null) {
+ generator.writeFieldName(VALUE_COUNTS);
+ SingleValueParser.toJson(DataFile.VALUE_COUNTS.type(),
contentFile.valueCounts(), generator);
+ }
+
+ if (contentFile.nullValueCounts() != null) {
+ generator.writeFieldName(NULL_VALUE_COUNTS);
+ SingleValueParser.toJson(
+ DataFile.NULL_VALUE_COUNTS.type(), contentFile.nullValueCounts(),
generator);
+ }
+
+ if (contentFile.nullValueCounts() != null) {
+ generator.writeFieldName(NAN_VALUE_COUNTS);
+ SingleValueParser.toJson(
+ DataFile.NAN_VALUE_COUNTS.type(), contentFile.nanValueCounts(),
generator);
+ }
+
+ if (contentFile.lowerBounds() != null) {
+ generator.writeFieldName(LOWER_BOUNDS);
+ SingleValueParser.toJson(DataFile.LOWER_BOUNDS.type(),
contentFile.lowerBounds(), generator);
+ }
+
+ if (contentFile.upperBounds() != null) {
+ generator.writeFieldName(UPPER_BOUNDS);
+ SingleValueParser.toJson(DataFile.UPPER_BOUNDS.type(),
contentFile.upperBounds(), generator);
+ }
+ }
+
+ private static Metrics metricsFromJson(JsonNode jsonNode) {
+ long recordCount = JsonUtil.getLong(RECORD_COUNT, jsonNode);
+
+ Map<Integer, Long> columnSizes = null;
+ if (jsonNode.has(COLUMN_SIZES)) {
+ columnSizes =
+ (Map<Integer, Long>)
+ SingleValueParser.fromJson(DataFile.COLUMN_SIZES.type(),
jsonNode.get(COLUMN_SIZES));
+ }
+
+ Map<Integer, Long> valueCounts = null;
+ if (jsonNode.has(VALUE_COUNTS)) {
+ valueCounts =
+ (Map<Integer, Long>)
+ SingleValueParser.fromJson(DataFile.VALUE_COUNTS.type(),
jsonNode.get(VALUE_COUNTS));
+ }
+
+ Map<Integer, Long> nullValueCounts = null;
+ if (jsonNode.has(NULL_VALUE_COUNTS)) {
+ nullValueCounts =
+ (Map<Integer, Long>)
+ SingleValueParser.fromJson(
+ DataFile.NULL_VALUE_COUNTS.type(),
jsonNode.get(NULL_VALUE_COUNTS));
+ }
+
+ Map<Integer, Long> nanValueCounts = null;
+ if (jsonNode.has(NAN_VALUE_COUNTS)) {
+ nanValueCounts =
+ (Map<Integer, Long>)
+ SingleValueParser.fromJson(
+ DataFile.NAN_VALUE_COUNTS.type(),
jsonNode.get(NAN_VALUE_COUNTS));
+ }
+
+ Map<Integer, ByteBuffer> lowerBounds = null;
+ if (jsonNode.has(LOWER_BOUNDS)) {
+ lowerBounds =
+ (Map<Integer, ByteBuffer>)
+ SingleValueParser.fromJson(DataFile.LOWER_BOUNDS.type(),
jsonNode.get(LOWER_BOUNDS));
+ }
+
+ Map<Integer, ByteBuffer> upperBounds = null;
+ if (jsonNode.has(UPPER_BOUNDS)) {
+ upperBounds =
+ (Map<Integer, ByteBuffer>)
+ SingleValueParser.fromJson(DataFile.UPPER_BOUNDS.type(),
jsonNode.get(UPPER_BOUNDS));
+ }
+
+ return new Metrics(
+ recordCount,
+ columnSizes,
+ valueCounts,
+ nullValueCounts,
+ nanValueCounts,
+ lowerBounds,
+ upperBounds);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java
b/core/src/main/java/org/apache/iceberg/DataFiles.java
index ef95c0bdf6..95b2891c98 100644
--- a/core/src/main/java/org/apache/iceberg/DataFiles.java
+++ b/core/src/main/java/org/apache/iceberg/DataFiles.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.ByteBuffers;
public class DataFiles {
@@ -123,7 +124,6 @@ public class DataFiles {
private FileFormat format = null;
private long recordCount = -1L;
private long fileSizeInBytes = -1L;
- private Integer sortOrderId = SortOrder.unsorted().orderId();
// optional fields
private Map<Integer, Long> columnSizes = null;
@@ -134,6 +134,8 @@ public class DataFiles {
private Map<Integer, ByteBuffer> upperBounds = null;
private ByteBuffer keyMetadata = null;
private List<Long> splitOffsets = null;
+ private List<Integer> equalityFieldIds = null;
+ private Integer sortOrderId = SortOrder.unsorted().orderId();
public Builder(PartitionSpec spec) {
this.spec = spec;
@@ -269,6 +271,14 @@ public class DataFiles {
return this;
}
+ public Builder withEqualityFieldIds(List<Integer> equalityIds) {
+ if (equalityIds != null) {
+ this.equalityFieldIds = ImmutableList.copyOf(equalityIds);
+ }
+
+ return this;
+ }
+
public Builder withEncryptionKeyMetadata(ByteBuffer newKeyMetadata) {
this.keyMetadata = newKeyMetadata;
return this;
@@ -310,6 +320,7 @@ public class DataFiles {
upperBounds),
keyMetadata,
splitOffsets,
+ ArrayUtil.toIntArray(equalityFieldIds),
sortOrderId);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
b/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
new file mode 100644
index 0000000000..b747eff98b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionParser;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.JsonUtil;
+
+public class FileScanTaskParser {
+ private static final String SCHEMA = "schema";
+ private static final String SPEC = "spec";
+ private static final String DATA_FILE = "data-file";
+ private static final String DELETE_FILES = "delete-files";
+ private static final String RESIDUAL = "residual-filter";
+
+ private FileScanTaskParser() {}
+
+ public static String toJson(FileScanTask fileScanTask) {
+ return JsonUtil.generate(
+ generator -> FileScanTaskParser.toJson(fileScanTask, generator),
false);
+ }
+
+ private static void toJson(FileScanTask fileScanTask, JsonGenerator
generator)
+ throws IOException {
+ Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task:
null");
+ Preconditions.checkArgument(generator != null, "Invalid JSON generator:
null");
+ generator.writeStartObject();
+
+ generator.writeFieldName(SCHEMA);
+ SchemaParser.toJson(fileScanTask.schema(), generator);
+
+ generator.writeFieldName(SPEC);
+ PartitionSpec spec = fileScanTask.spec();
+ PartitionSpecParser.toJson(spec, generator);
+
+ if (fileScanTask.file() != null) {
+ generator.writeFieldName(DATA_FILE);
+ ContentFileParser.toJson(fileScanTask.file(), spec, generator);
+ }
+
+ if (fileScanTask.deletes() != null) {
+ generator.writeArrayFieldStart(DELETE_FILES);
+ for (DeleteFile deleteFile : fileScanTask.deletes()) {
+ ContentFileParser.toJson(deleteFile, spec, generator);
+ }
+ generator.writeEndArray();
+ }
+
+ if (fileScanTask.residual() != null) {
+ generator.writeFieldName(RESIDUAL);
+ ExpressionParser.toJson(fileScanTask.residual(), generator);
+ }
+
+ generator.writeEndObject();
+ }
+
+ public static FileScanTask fromJson(String json, boolean caseSensitive) {
+ Preconditions.checkArgument(json != null, "Invalid JSON string for file
scan task: null");
+ return JsonUtil.parse(json, node -> FileScanTaskParser.fromJson(node,
caseSensitive));
+ }
+
+ private static FileScanTask fromJson(JsonNode jsonNode, boolean
caseSensitive) {
+ Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file
scan task: null");
+ Preconditions.checkArgument(
+ jsonNode.isObject(), "Invalid JSON node for file scan task: non-object
(%s)", jsonNode);
+
+ Schema schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode));
+ String schemaString = SchemaParser.toJson(schema);
+
+ PartitionSpec spec = PartitionSpecParser.fromJson(schema,
JsonUtil.get(SPEC, jsonNode));
+ String specString = PartitionSpecParser.toJson(spec);
+
+ DataFile dataFile = null;
+ if (jsonNode.has(DATA_FILE)) {
+ dataFile = (DataFile)
ContentFileParser.fromJson(jsonNode.get(DATA_FILE), spec);
+ }
+
+ DeleteFile[] deleteFiles = null;
+ if (jsonNode.has(DELETE_FILES)) {
+ JsonNode deletesArray = jsonNode.get(DELETE_FILES);
+ Preconditions.checkArgument(
+ deletesArray.isArray(),
+ "Invalid JSON node for delete files: non-array (%s)",
+ deletesArray);
+ // parse the schema array
+ ImmutableList.Builder<DeleteFile> builder = ImmutableList.builder();
+ for (JsonNode deleteFileNode : deletesArray) {
+ DeleteFile deleteFile = (DeleteFile)
ContentFileParser.fromJson(deleteFileNode, spec);
+ builder.add(deleteFile);
+ }
+
+ deleteFiles = builder.build().toArray(new DeleteFile[0]);
+ }
+
+ Expression filter = Expressions.alwaysTrue();
+ if (jsonNode.has(RESIDUAL)) {
+ filter = ExpressionParser.fromJson(jsonNode.get(RESIDUAL));
+ }
+
+ ResidualEvaluator residualEvaluator = ResidualEvaluator.of(spec, filter,
caseSensitive);
+ return new BaseFileScanTask(dataFile, deleteFiles, schemaString,
specString, residualEvaluator);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
index 34c65e669f..07c5172f1b 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
@@ -40,6 +40,7 @@ class GenericDataFile extends BaseFile<DataFile> implements
DataFile {
Metrics metrics,
ByteBuffer keyMetadata,
List<Long> splitOffsets,
+ int[] equalityFieldIds,
Integer sortOrderId) {
super(
specId,
@@ -56,7 +57,7 @@ class GenericDataFile extends BaseFile<DataFile> implements
DataFile {
metrics.lowerBounds(),
metrics.upperBounds(),
splitOffsets,
- null,
+ equalityFieldIds,
sortOrderId,
keyMetadata);
}
diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
index 84c0681164..aa90c63f80 100644
--- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
@@ -26,14 +26,17 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
import java.io.StringWriter;
import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
public class JsonUtil {
@@ -173,12 +176,24 @@ public class JsonUtil {
return getString(property, node);
}
+ public static ByteBuffer getByteBufferOrNull(String property, JsonNode node)
{
+ if (!node.has(property) || node.get(property).isNull()) {
+ return null;
+ }
+
+ JsonNode pNode = node.get(property);
+ Preconditions.checkArgument(
+ pNode.isTextual(), "Cannot parse byte buffer from non-text value: %s:
%s", property, pNode);
+ return ByteBuffer.wrap(
+
BaseEncoding.base16().decode(pNode.textValue().toUpperCase(Locale.ROOT)));
+ }
+
public static Map<String, String> getStringMap(String property, JsonNode
node) {
Preconditions.checkArgument(node.has(property), "Cannot parse missing map:
%s", property);
JsonNode pNode = node.get(property);
Preconditions.checkArgument(
pNode != null && !pNode.isNull() && pNode.isObject(),
- "Cannot parse from non-object value: %s: %s",
+ "Cannot parse string map from non-object value: %s: %s",
property,
pNode);
@@ -229,6 +244,14 @@ public class JsonUtil {
.build();
}
+ public static int[] getIntArrayOrNull(String property, JsonNode node) {
+ if (!node.has(property) || node.get(property).isNull()) {
+ return null;
+ }
+
+ return ArrayUtil.toIntArray(getIntegerList(property, node));
+ }
+
public static List<Integer> getIntegerList(String property, JsonNode node) {
Preconditions.checkArgument(node.has(property), "Cannot parse missing
list: %s", property);
return ImmutableList.<Integer>builder()
@@ -256,6 +279,14 @@ public class JsonUtil {
return ImmutableList.<Long>builder().addAll(new
JsonLongArrayIterator(property, node)).build();
}
+ public static List<Long> getLongListOrNull(String property, JsonNode node) {
+ if (!node.has(property) || node.get(property).isNull()) {
+ return null;
+ }
+
+ return ImmutableList.<Long>builder().addAll(new
JsonLongArrayIterator(property, node)).build();
+ }
+
public static Set<Long> getLongSetOrNull(String property, JsonNode node) {
if (!node.hasNonNull(property)) {
return null;
@@ -291,7 +322,7 @@ public class JsonUtil {
JsonNode pNode = node.get(property);
Preconditions.checkArgument(
pNode != null && !pNode.isNull() && pNode.isArray(),
- "Cannot parse from non-array value: %s: %s",
+ "Cannot parse JSON array from non-array value: %s: %s",
property,
pNode);
this.elements = pNode.elements();
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java
b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 038dfadbff..a800214bc9 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -57,7 +57,7 @@ public class TableTestBase {
protected static final int BUCKETS_NUMBER = 16;
// Partition spec used to create tables
- protected static final PartitionSpec SPEC =
+ public static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).bucket("data", BUCKETS_NUMBER).build();
static final DataFile FILE_A =
diff --git a/core/src/test/java/org/apache/iceberg/TestContentFileParser.java
b/core/src/test/java/org/apache/iceberg/TestContentFileParser.java
new file mode 100644
index 0000000000..9360f571c5
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestContentFileParser.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Stream;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.JsonUtil;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+public class TestContentFileParser {
+ @Test
+ public void testNullArguments() throws Exception {
+ Assertions.assertThatThrownBy(() -> ContentFileParser.toJson(null,
TableTestBase.SPEC))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid content file: null");
+
+ Assertions.assertThatThrownBy(() ->
ContentFileParser.toJson(TableTestBase.FILE_A, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid partition spec: null");
+
+ Assertions.assertThatThrownBy(
+ () -> ContentFileParser.toJson(TableTestBase.FILE_A,
TableTestBase.SPEC, null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid JSON generator: null");
+
+ Assertions.assertThatThrownBy(() -> ContentFileParser.fromJson(null,
TableTestBase.SPEC))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid JSON node for content file: null");
+
+ String jsonStr = ContentFileParser.toJson(TableTestBase.FILE_A,
TableTestBase.SPEC);
+ JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr);
+ Assertions.assertThatThrownBy(() -> ContentFileParser.fromJson(jsonNode,
null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid partition spec: null");
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideSpecAndDataFile")
+ public void testDataFile(PartitionSpec spec, DataFile dataFile, String
expectedJson)
+ throws Exception {
+ String jsonStr = ContentFileParser.toJson(dataFile, spec);
+ Assertions.assertThat(jsonStr).isEqualTo(expectedJson);
+ JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr);
+ ContentFile<?> deserializedContentFile =
ContentFileParser.fromJson(jsonNode, spec);
+
Assertions.assertThat(deserializedContentFile).isInstanceOf(DataFile.class);
+ assertContentFileEquals(dataFile, deserializedContentFile, spec);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideSpecAndDeleteFile")
+ public void testDeleteFile(PartitionSpec spec, DeleteFile deleteFile, String
expectedJson)
+ throws Exception {
+ String jsonStr = ContentFileParser.toJson(deleteFile, spec);
+ Assertions.assertThat(jsonStr).isEqualTo(expectedJson);
+ JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr);
+ ContentFile<?> deserializedContentFile =
ContentFileParser.fromJson(jsonNode, spec);
+
Assertions.assertThat(deserializedContentFile).isInstanceOf(DeleteFile.class);
+ assertContentFileEquals(deleteFile, deserializedContentFile, spec);
+ }
+
+ private static Stream<Arguments> provideSpecAndDataFile() {
+ return Stream.of(
+ Arguments.of(
+ PartitionSpec.unpartitioned(),
+ dataFileWithRequiredOnly(PartitionSpec.unpartitioned()),
+ dataFileJsonWithRequiredOnly(PartitionSpec.unpartitioned())),
+ Arguments.of(
+ PartitionSpec.unpartitioned(),
+ dataFileWithAllOptional(PartitionSpec.unpartitioned()),
+ dataFileJsonWithAllOptional(PartitionSpec.unpartitioned())),
+ Arguments.of(
+ TableTestBase.SPEC,
+ dataFileWithRequiredOnly(TableTestBase.SPEC),
+ dataFileJsonWithRequiredOnly(TableTestBase.SPEC)),
+ Arguments.of(
+ TableTestBase.SPEC,
+ dataFileWithAllOptional(TableTestBase.SPEC),
+ dataFileJsonWithAllOptional(TableTestBase.SPEC)));
+ }
+
+ private static DataFile dataFileWithRequiredOnly(PartitionSpec spec) {
+ DataFiles.Builder builder =
+ DataFiles.builder(spec)
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1);
+
+ if (spec.isPartitioned()) {
+ // easy way to set partition data for now
+ builder.withPartitionPath("data_bucket=1");
+ }
+
+ return builder.build();
+ }
+
+ private static String dataFileJsonWithRequiredOnly(PartitionSpec spec) {
+ if (spec.isUnpartitioned()) {
+ return
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\",\"file-format\":\"PARQUET\","
+ +
"\"partition\":{},\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}";
+ } else {
+ return
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\",\"file-format\":\"PARQUET\","
+ +
"\"partition\":{\"1000\":1},\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}";
+ }
+ }
+
+ private static String dataFileJsonWithAllOptional(PartitionSpec spec) {
+ if (spec.isUnpartitioned()) {
+ return
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-with-stats.parquet\","
+ +
"\"file-format\":\"PARQUET\",\"partition\":{},\"file-size-in-bytes\":350,\"record-count\":10,"
+ + "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]},"
+ + "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]},"
+ + "\"null-value-counts\":{\"keys\":[3,4],\"values\":[10,20]},"
+ + "\"nan-value-counts\":{\"keys\":[3,4],\"values\":[0,0]},"
+ +
"\"lower-bounds\":{\"keys\":[3,4],\"values\":[\"01000000\",\"02000000\"]},"
+ +
"\"upper-bounds\":{\"keys\":[3,4],\"values\":[\"05000000\",\"0A000000\"]},"
+ + "\"key-metadata\":\"00000000000000000000000000000000\","
+ +
"\"split-offsets\":[128,256],\"equality-ids\":[1],\"sort-order-id\":1}";
+ } else {
+ return
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-with-stats.parquet\","
+ +
"\"file-format\":\"PARQUET\",\"partition\":{\"1000\":1},\"file-size-in-bytes\":350,\"record-count\":10,"
+ + "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]},"
+ + "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]},"
+ + "\"null-value-counts\":{\"keys\":[3,4],\"values\":[10,20]},"
+ + "\"nan-value-counts\":{\"keys\":[3,4],\"values\":[0,0]},"
+ +
"\"lower-bounds\":{\"keys\":[3,4],\"values\":[\"01000000\",\"02000000\"]},"
+ +
"\"upper-bounds\":{\"keys\":[3,4],\"values\":[\"05000000\",\"0A000000\"]},"
+ + "\"key-metadata\":\"00000000000000000000000000000000\","
+ +
"\"split-offsets\":[128,256],\"equality-ids\":[1],\"sort-order-id\":1}";
+ }
+ }
+
+ private static DataFile dataFileWithAllOptional(PartitionSpec spec) {
+ DataFiles.Builder builder =
+ DataFiles.builder(spec)
+ .withPath("/path/to/data-with-stats.parquet")
+ .withMetrics(
+ new Metrics(
+ 10L, // record count
+ ImmutableMap.of(3, 100L, 4, 200L), // column sizes
+ ImmutableMap.of(3, 90L, 4, 180L), // value counts
+ ImmutableMap.of(3, 10L, 4, 20L), // null value counts
+ ImmutableMap.of(3, 0L, 4, 0L), // nan value counts
+ ImmutableMap.of(
+ 3,
+ Conversions.toByteBuffer(Types.IntegerType.get(), 1),
+ 4,
+ Conversions.toByteBuffer(Types.IntegerType.get(), 2)),
// lower bounds
+ ImmutableMap.of(
+ 3,
+ Conversions.toByteBuffer(Types.IntegerType.get(), 5),
+ 4,
+ Conversions.toByteBuffer(Types.IntegerType.get(), 10))
// upperbounds
+ ))
+ .withFileSizeInBytes(350)
+ .withSplitOffsets(Arrays.asList(128L, 256L))
+ .withEqualityFieldIds(Collections.singletonList(1))
+ .withEncryptionKeyMetadata(ByteBuffer.wrap(new byte[16]))
+ .withSortOrder(
+ SortOrder.builderFor(TableTestBase.SCHEMA)
+ .withOrderId(1)
+ .sortBy("id", SortDirection.ASC, NullOrder.NULLS_FIRST)
+ .build());
+
+ if (spec.isPartitioned()) {
+ // easy way to set partition data for now
+ builder.withPartitionPath("data_bucket=1");
+ }
+
+ return builder.build();
+ }
+
+ private static Stream<Arguments> provideSpecAndDeleteFile() {
+ return Stream.of(
+ Arguments.of(
+ PartitionSpec.unpartitioned(),
+ deleteFileWithRequiredOnly(PartitionSpec.unpartitioned()),
+ deleteFileJsonWithRequiredOnly(PartitionSpec.unpartitioned())),
+ Arguments.of(
+ PartitionSpec.unpartitioned(),
+ deleteFileWithAllOptional(PartitionSpec.unpartitioned()),
+ deleteFileJsonWithAllOptional(PartitionSpec.unpartitioned())),
+ Arguments.of(
+ TableTestBase.SPEC,
+ deleteFileWithRequiredOnly(TableTestBase.SPEC),
+ deleteFileJsonWithRequiredOnly(TableTestBase.SPEC)),
+ Arguments.of(
+ TableTestBase.SPEC,
+ deleteFileWithAllOptional(TableTestBase.SPEC),
+ deleteFileJsonWithAllOptional(TableTestBase.SPEC)));
+ }
+
+ private static DeleteFile deleteFileWithRequiredOnly(PartitionSpec spec) {
+ PartitionData partitionData = null;
+ if (spec.isPartitioned()) {
+ partitionData = new PartitionData(spec.partitionType());
+ partitionData.set(0, 9);
+ }
+
+ return new GenericDeleteFile(
+ spec.specId(),
+ FileContent.POSITION_DELETES,
+ "/path/to/delete-a.parquet",
+ FileFormat.PARQUET,
+ partitionData,
+ 1234,
+ new Metrics(9L, null, null, null, null),
+ null,
+ null,
+ null,
+ null);
+ }
+
+ private static DeleteFile deleteFileWithAllOptional(PartitionSpec spec) {
+ PartitionData partitionData = new PartitionData(spec.partitionType());
+ if (spec.isPartitioned()) {
+ partitionData.set(0, 9);
+ }
+
+ Metrics metrics =
+ new Metrics(
+ 10L, // record count
+ ImmutableMap.of(3, 100L, 4, 200L), // column sizes
+ ImmutableMap.of(3, 90L, 4, 180L), // value counts
+ ImmutableMap.of(3, 10L, 4, 20L), // null value counts
+ ImmutableMap.of(3, 0L, 4, 0L), // nan value counts
+ ImmutableMap.of(
+ 3,
+ Conversions.toByteBuffer(Types.IntegerType.get(), 1),
+ 4,
+ Conversions.toByteBuffer(Types.IntegerType.get(), 2)), //
lower bounds
+ ImmutableMap.of(
+ 3,
+ Conversions.toByteBuffer(Types.IntegerType.get(), 5),
+ 4,
+ Conversions.toByteBuffer(Types.IntegerType.get(), 10)) //
upperbounds
+ );
+
+ return new GenericDeleteFile(
+ spec.specId(),
+ FileContent.EQUALITY_DELETES,
+ "/path/to/delete-with-stats.parquet",
+ FileFormat.PARQUET,
+ partitionData,
+ 1234,
+ metrics,
+ new int[] {3},
+ 1,
+ Arrays.asList(128L),
+ ByteBuffer.wrap(new byte[16]));
+ }
+
+ private static String deleteFileJsonWithRequiredOnly(PartitionSpec spec) {
+ if (spec.isUnpartitioned()) {
+ return
"{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete-a.parquet\","
+ +
"\"file-format\":\"PARQUET\",\"partition\":{},\"file-size-in-bytes\":1234,\"record-count\":9}";
+ } else {
+ return
"{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete-a.parquet\","
+ +
"\"file-format\":\"PARQUET\",\"partition\":{\"1000\":9},\"file-size-in-bytes\":1234,\"record-count\":9}";
+ }
+ }
+
+ private static String deleteFileJsonWithAllOptional(PartitionSpec spec) {
+ if (spec.isUnpartitioned()) {
+ return
"{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/delete-with-stats.parquet\","
+ +
"\"file-format\":\"PARQUET\",\"partition\":{},\"file-size-in-bytes\":1234,\"record-count\":10,"
+ + "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]},"
+ + "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]},"
+ + "\"null-value-counts\":{\"keys\":[3,4],\"values\":[10,20]},"
+ + "\"nan-value-counts\":{\"keys\":[3,4],\"values\":[0,0]},"
+ +
"\"lower-bounds\":{\"keys\":[3,4],\"values\":[\"01000000\",\"02000000\"]},"
+ +
"\"upper-bounds\":{\"keys\":[3,4],\"values\":[\"05000000\",\"0A000000\"]},"
+ + "\"key-metadata\":\"00000000000000000000000000000000\","
+ +
"\"split-offsets\":[128],\"equality-ids\":[3],\"sort-order-id\":1}";
+ } else {
+ return
"{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/delete-with-stats.parquet\","
+ +
"\"file-format\":\"PARQUET\",\"partition\":{\"1000\":9},\"file-size-in-bytes\":1234,\"record-count\":10,"
+ + "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]},"
+ + "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]},"
+ + "\"null-value-counts\":{\"keys\":[3,4],\"values\":[10,20]},"
+ + "\"nan-value-counts\":{\"keys\":[3,4],\"values\":[0,0]},"
+ +
"\"lower-bounds\":{\"keys\":[3,4],\"values\":[\"01000000\",\"02000000\"]},"
+ +
"\"upper-bounds\":{\"keys\":[3,4],\"values\":[\"05000000\",\"0A000000\"]},"
+ + "\"key-metadata\":\"00000000000000000000000000000000\","
+ +
"\"split-offsets\":[128],\"equality-ids\":[3],\"sort-order-id\":1}";
+ }
+ }
+
+ static void assertContentFileEquals(
+ ContentFile<?> expected, ContentFile<?> actual, PartitionSpec spec) {
+ Assertions.assertThat(actual.getClass()).isEqualTo(expected.getClass());
+ Assertions.assertThat(actual.specId()).isEqualTo(expected.specId());
+ Assertions.assertThat(actual.content()).isEqualTo(expected.content());
+ Assertions.assertThat(actual.path()).isEqualTo(expected.path());
+ Assertions.assertThat(actual.format()).isEqualTo(expected.format());
+ Assertions.assertThat(actual.partition())
+ .usingComparator(Comparators.forType(spec.partitionType()))
+ .isEqualTo(expected.partition());
+
Assertions.assertThat(actual.recordCount()).isEqualTo(expected.recordCount());
+
Assertions.assertThat(actual.fileSizeInBytes()).isEqualTo(expected.fileSizeInBytes());
+
Assertions.assertThat(actual.columnSizes()).isEqualTo(expected.columnSizes());
+
Assertions.assertThat(actual.valueCounts()).isEqualTo(expected.valueCounts());
+
Assertions.assertThat(actual.nullValueCounts()).isEqualTo(expected.nullValueCounts());
+
Assertions.assertThat(actual.nanValueCounts()).isEqualTo(expected.nanValueCounts());
+
Assertions.assertThat(actual.lowerBounds()).isEqualTo(expected.lowerBounds());
+
Assertions.assertThat(actual.upperBounds()).isEqualTo(expected.upperBounds());
+
Assertions.assertThat(actual.keyMetadata()).isEqualTo(expected.keyMetadata());
+
Assertions.assertThat(actual.splitOffsets()).isEqualTo(expected.splitOffsets());
+
Assertions.assertThat(actual.equalityFieldIds()).isEqualTo(expected.equalityFieldIds());
+
Assertions.assertThat(actual.sortOrderId()).isEqualTo(expected.sortOrderId());
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
b/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
new file mode 100644
index 0000000000..221a5507b1
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestFileScanTaskParser {
+ @Test
+ public void testNullArguments() {
+ Assertions.assertThatThrownBy(() -> FileScanTaskParser.toJson(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid file scan task: null");
+
+ Assertions.assertThatThrownBy(() -> FileScanTaskParser.fromJson(null,
true))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid JSON string for file scan task: null");
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testParser(boolean caseSensitive) {
+ PartitionSpec spec = TableTestBase.SPEC;
+ FileScanTask fileScanTask = createScanTask(spec, caseSensitive);
+ String jsonStr = FileScanTaskParser.toJson(fileScanTask);
+ Assertions.assertThat(jsonStr).isEqualTo(expectedFileScanTaskJson());
+ FileScanTask deserializedTask = FileScanTaskParser.fromJson(jsonStr,
caseSensitive);
+ assertFileScanTaskEquals(fileScanTask, deserializedTask, spec,
caseSensitive);
+ }
+
+ private FileScanTask createScanTask(PartitionSpec spec, boolean
caseSensitive) {
+ ResidualEvaluator residualEvaluator;
+ if (spec.isUnpartitioned()) {
+ residualEvaluator =
ResidualEvaluator.unpartitioned(Expressions.alwaysTrue());
+ } else {
+ residualEvaluator = ResidualEvaluator.of(spec, Expressions.equal("id",
1), caseSensitive);
+ }
+
+ return new BaseFileScanTask(
+ TableTestBase.FILE_A,
+ new DeleteFile[] {TableTestBase.FILE_A_DELETES,
TableTestBase.FILE_A2_DELETES},
+ SchemaParser.toJson(TableTestBase.SCHEMA),
+ PartitionSpecParser.toJson(spec),
+ residualEvaluator);
+ }
+
+ private String expectedFileScanTaskJson() {
+ return "{\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":["
+ + "{\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"},"
+ +
"{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]},"
+ + "\"spec\":{\"spec-id\":0,\"fields\":[{\"name\":\"data_bucket\","
+ + "\"transform\":\"bucket[16]\",\"source-id\":4,\"field-id\":1000}]},"
+ +
"\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\","
+ + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0},"
+ + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0},"
+ + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\","
+ +
"\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\","
+ +
"\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1},"
+ +
"{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/data-a2-deletes.parquet\","
+ +
"\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,"
+ + "\"record-count\":1,\"equality-ids\":[1],\"sort-order-id\":0}],"
+ + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}";
+ }
+
+ private static void assertFileScanTaskEquals(
+ FileScanTask expected, FileScanTask actual, PartitionSpec spec, boolean
caseSensitive) {
+ TestContentFileParser.assertContentFileEquals(expected.file(),
actual.file(), spec);
+ Assertions.assertThat(actual.deletes()).hasSameSizeAs(expected.deletes());
+ for (int pos = 0; pos < expected.deletes().size(); ++pos) {
+ TestContentFileParser.assertContentFileEquals(
+ expected.deletes().get(pos), actual.deletes().get(pos), spec);
+ }
+
+ Assertions.assertThat(expected.schema().sameSchema(actual.schema()))
+ .as("Schema should match")
+ .isTrue();
+ Assertions.assertThat(actual.spec()).isEqualTo(expected.spec());
+ Assertions.assertThat(
+ ExpressionUtil.equivalent(
+ expected.residual(),
+ actual.residual(),
+ TableTestBase.SCHEMA.asStruct(),
+ caseSensitive))
+ .as("Residual expression should match")
+ .isTrue();
+ }
+}
diff --git
a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index 740791b255..08b27d7460 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -78,7 +78,7 @@ public class TestManifestWriterVersions {
private static final DataFile DATA_FILE =
new GenericDataFile(
- 0, PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS,
SORT_ORDER_ID);
+ 0, PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS, null,
SORT_ORDER_ID);
private static final List<Integer> EQUALITY_IDS = ImmutableList.of(1);
private static final int[] EQUALITY_ID_ARR = new int[] {1};
diff --git
a/core/src/test/java/org/apache/iceberg/catalog/TestTableIdentifierParser.java
b/core/src/test/java/org/apache/iceberg/catalog/TestTableIdentifierParser.java
index 04972e5cd6..587b9395f5 100644
---
a/core/src/test/java/org/apache/iceberg/catalog/TestTableIdentifierParser.java
+++
b/core/src/test/java/org/apache/iceberg/catalog/TestTableIdentifierParser.java
@@ -95,7 +95,7 @@ public class TestTableIdentifierParser {
String invalidNamespace =
"{\"namespace\":\"accounting.tax\",\"name\":\"paid\"}";
Assertions.assertThatThrownBy(() ->
TableIdentifierParser.fromJson(invalidNamespace))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot parse from non-array value: namespace:
\"accounting.tax\"");
+ .hasMessage("Cannot parse JSON array from non-array value: namespace:
\"accounting.tax\"");
String invalidName =
"{\"namespace\":[\"accounting\",\"tax\"],\"name\":1234}";
Assertions.assertThatThrownBy(() ->
TableIdentifierParser.fromJson(invalidName))
diff --git
a/core/src/test/java/org/apache/iceberg/rest/responses/TestListTablesResponse.java
b/core/src/test/java/org/apache/iceberg/rest/responses/TestListTablesResponse.java
index 9e8bffaf6d..116d43a6d1 100644
---
a/core/src/test/java/org/apache/iceberg/rest/responses/TestListTablesResponse.java
+++
b/core/src/test/java/org/apache/iceberg/rest/responses/TestListTablesResponse.java
@@ -67,7 +67,8 @@ public class TestListTablesResponse extends
RequestResponseTestBase<ListTablesRe
"{\"identifiers\":[{\"namespace\":\"accounting.tax\",\"name\":\"paid\"}]}";
Assertions.assertThatThrownBy(() ->
deserialize(jsonWithInvalidIdentifiersInList))
.isInstanceOf(JsonProcessingException.class)
- .hasMessageContaining("Cannot parse from non-array value");
+ .hasMessageContaining(
+ "Cannot parse JSON array from non-array value: namespace:
\"accounting.tax\"");
String jsonWithInvalidIdentifiersInList2 =
"{\"identifiers\":[{\"namespace\":[\"accounting\",\"tax\"],\"name\":\"paid\"},\"accounting.tax.paid\"]}";
diff --git a/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java
b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java
index 8a769fccb9..75191f4d70 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java
@@ -19,10 +19,12 @@
package org.apache.iceberg.util;
import com.fasterxml.jackson.core.JsonProcessingException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@@ -167,6 +169,26 @@ public class TestJsonUtil {
.hasMessage("Cannot parse to a string value: x: 23");
}
+ @Test
+ public void getByteBufferOrNull() throws JsonProcessingException {
+ Assertions.assertThat(JsonUtil.getByteBufferOrNull("x",
JsonUtil.mapper().readTree("{}")))
+ .isNull();
+ Assertions.assertThat(
+ JsonUtil.getByteBufferOrNull("x",
JsonUtil.mapper().readTree("{\"x\": null}")))
+ .isNull();
+
+ byte[] bytes = new byte[] {1, 2, 3, 4};
+ String base16Str = BaseEncoding.base16().encode(bytes);
+ String json = String.format("{\"x\": \"%s\"}", base16Str);
+ ByteBuffer byteBuffer = JsonUtil.getByteBufferOrNull("x",
JsonUtil.mapper().readTree(json));
+ Assertions.assertThat(byteBuffer.array()).isEqualTo(bytes);
+
+ Assertions.assertThatThrownBy(
+ () -> JsonUtil.getByteBufferOrNull("x",
JsonUtil.mapper().readTree("{\"x\": 23}")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse byte buffer from non-text value: x: 23");
+ }
+
@Test
public void getBool() throws JsonProcessingException {
Assertions.assertThatThrownBy(() -> JsonUtil.getBool("x",
JsonUtil.mapper().readTree("{}")))
@@ -194,6 +216,28 @@ public class TestJsonUtil {
.isFalse();
}
+ @Test
+ public void getIntArrayOrNull() throws JsonProcessingException {
+ Assertions.assertThat(JsonUtil.getIntArrayOrNull("items",
JsonUtil.mapper().readTree("{}")))
+ .isNull();
+
+ Assertions.assertThat(
+ JsonUtil.getIntArrayOrNull("items",
JsonUtil.mapper().readTree("{\"items\": null}")))
+ .isNull();
+
+ Assertions.assertThatThrownBy(
+ () ->
+ JsonUtil.getIntArrayOrNull(
+ "items", JsonUtil.mapper().readTree("{\"items\": [13,
\"23\"]}")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse integer from non-int value in items:
\"23\"");
+
+ Assertions.assertThat(
+ JsonUtil.getIntArrayOrNull(
+ "items", JsonUtil.mapper().readTree("{\"items\": [23, 45]}")))
+ .isEqualTo(new int[] {23, 45});
+ }
+
@Test
public void getIntegerList() throws JsonProcessingException {
Assertions.assertThatThrownBy(
@@ -204,7 +248,7 @@ public class TestJsonUtil {
Assertions.assertThatThrownBy(
() -> JsonUtil.getIntegerList("items",
JsonUtil.mapper().readTree("{\"items\": null}")))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot parse from non-array value: items: null");
+ .hasMessage("Cannot parse JSON array from non-array value: items:
null");
Assertions.assertThatThrownBy(
() ->
@@ -240,7 +284,7 @@ public class TestJsonUtil {
Assertions.assertThatThrownBy(
() -> JsonUtil.getIntegerSet("items",
JsonUtil.mapper().readTree("{\"items\": null}")))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot parse from non-array value: items: null");
+ .hasMessage("Cannot parse JSON array from non-array value: items:
null");
Assertions.assertThatThrownBy(
() ->
@@ -286,7 +330,7 @@ public class TestJsonUtil {
Assertions.assertThatThrownBy(
() -> JsonUtil.getLongList("items",
JsonUtil.mapper().readTree("{\"items\": null}")))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot parse from non-array value: items: null");
+ .hasMessage("Cannot parse JSON array from non-array value: items:
null");
Assertions.assertThatThrownBy(
() ->
@@ -312,6 +356,28 @@ public class TestJsonUtil {
.isEqualTo(items);
}
+ @Test
+ public void getLongListOrNull() throws JsonProcessingException {
+ Assertions.assertThat(JsonUtil.getLongListOrNull("items",
JsonUtil.mapper().readTree("{}")))
+ .isNull();
+
+ Assertions.assertThat(
+ JsonUtil.getLongListOrNull("items",
JsonUtil.mapper().readTree("{\"items\": null}")))
+ .isNull();
+
+ Assertions.assertThatThrownBy(
+ () ->
+ JsonUtil.getLongListOrNull(
+ "items", JsonUtil.mapper().readTree("{\"items\": [13,
\"23\"]}")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse long from non-long value in items: \"23\"");
+
+ Assertions.assertThat(
+ JsonUtil.getLongListOrNull(
+ "items", JsonUtil.mapper().readTree("{\"items\": [23, 45]}")))
+ .containsExactlyElementsOf(Arrays.asList(23L, 45L));
+ }
+
@Test
public void getLongSet() throws JsonProcessingException {
Assertions.assertThatThrownBy(
@@ -322,7 +388,7 @@ public class TestJsonUtil {
Assertions.assertThatThrownBy(
() -> JsonUtil.getLongSet("items",
JsonUtil.mapper().readTree("{\"items\": null}")))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot parse from non-array value: items: null");
+ .hasMessage("Cannot parse JSON array from non-array value: items:
null");
Assertions.assertThatThrownBy(
() ->
@@ -367,7 +433,7 @@ public class TestJsonUtil {
Assertions.assertThatThrownBy(
() -> JsonUtil.getStringList("items",
JsonUtil.mapper().readTree("{\"items\": null}")))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot parse from non-array value: items: null");
+ .hasMessage("Cannot parse JSON array from non-array value: items:
null");
Assertions.assertThatThrownBy(
() ->
@@ -426,7 +492,7 @@ public class TestJsonUtil {
Assertions.assertThatThrownBy(
() -> JsonUtil.getStringSet("items",
JsonUtil.mapper().readTree("{\"items\": null}")))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot parse from non-array value: items: null");
+ .hasMessage("Cannot parse JSON array from non-array value: items:
null");
Assertions.assertThatThrownBy(
() ->
@@ -451,7 +517,7 @@ public class TestJsonUtil {
Assertions.assertThatThrownBy(
() -> JsonUtil.getStringMap("items",
JsonUtil.mapper().readTree("{\"items\": null}")))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot parse from non-object value: items: null");
+ .hasMessage("Cannot parse string map from non-object value: items:
null");
Assertions.assertThatThrownBy(
() ->
diff --git a/format/spec.md b/format/spec.md
index 58cfc22911..60c0f99c3f 100644
--- a/format/spec.md
+++ b/format/spec.md
@@ -1128,6 +1128,41 @@ Example
] } ]
```
+### Content File (Data and Delete) Serialization
+
+Content file (data or delete) is serialized as a JSON object according to the
following table.
+
+| Metadata field |JSON representation|Example|
+|--------------------------|--- |--- |
+| **`spec-id`** |`JSON int`|`1`|
+| **`content`** |`JSON string`|`DATA`, `POSITION_DELETES`,
`EQUALITY_DELETES`|
+| **`file-path`** |`JSON string`|`"s3://b/wh/data.db/table"`|
+| **`file-format`** |`JSON string`|`AVRO`, `ORC`, `PARQUET`|
+| **`partition`** |`JSON object: Partition data tuple using partition
field ids for the struct field ids`|`{"1000":1}`|
+| **`record-count`** |`JSON long`|`1`|
+| **`file-size-in-bytes`** |`JSON long`|`1024`|
+| **`column-sizes`** |`JSON object: Map from column id to the total size
on disk of all regions that store the
column.`|`{"keys":[3,4],"values":[100,200]}`|
+| **`value-counts`** |`JSON object: Map from column id to number of
values in the column (including null and NaN
values)`|`{"keys":[3,4],"values":[90,180]}`|
+| **`null-value-counts`** |`JSON object: Map from column id to number of null
values in the column`|`{"keys":[3,4],"values":[10,20]}`|
+| **`nan-value-counts`** |`JSON object: Map from column id to number of NaN
values in the column`|`{"keys":[3,4],"values":[0,0]}`|
+| **`lower-bounds`** |`JSON object: Map from column id to lower bound
binary in the column serialized as hexadecimal
string`|`{"keys":[3,4],"values":["01000000","02000000"]}`|
+| **`upper-bounds`** |`JSON object: Map from column id to upper bound
binary in the column serialized as hexadecimal
string`|`{"keys":[3,4],"values":["05000000","0A000000"]}`|
+| **`key-metadata`** |`JSON string: Encryption key metadata binary
serialized as hexadecimal string`|`00000000000000000000000000000000`|
+| **`split-offsets`** |`JSON list of long: Split offsets for the data
file`|`[128,256]`|
+| **`equality-ids`** |`JSON list of int: Field ids used to determine row
equality in equality delete files`|`[1]`|
+| **`sort-order-id`** |`JSON int`|`1`|
+
+### File Scan Task Serialization
+
+File scan task is serialized as a JSON object according to the following table.
+
+| Metadata field |JSON representation|Example|
+|--------------------------|--- |--- |
+| **`schema`** |`JSON object`|`See above, read schemas instead`|
+| **`spec`** |`JSON object`|`See above, read partition specs
instead`|
+| **`data-file`** |`JSON object`|`See above, read content file instead`|
+| **`delete-files`** |`JSON list of objects`|`See above, read content file
instead`|
+| **`residual-filter`** |`JSON object: residual filter
expression`|`{"type":"eq","term":"id","value":1}`|
## Appendix D: Single-value serialization