This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b8db3f0042  Core: add JSON parser for ContentFile and FileScanTask 
(#6934)
b8db3f0042 is described below

commit b8db3f00424c2cc2910a760b9068d38916e8c27a
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Mon Jun 26 09:02:19 2023 -0700

     Core: add JSON parser for ContentFile and FileScanTask (#6934)
---
 .../main/java/org/apache/iceberg/FileScanTask.java |   5 +
 .../org/apache/iceberg/BaseContentScanTask.java    |  15 +-
 .../java/org/apache/iceberg/BaseFileScanTask.java  |   5 +
 .../java/org/apache/iceberg/ContentFileParser.java | 270 ++++++++++++++++
 .../main/java/org/apache/iceberg/DataFiles.java    |  13 +-
 .../org/apache/iceberg/FileScanTaskParser.java     | 126 ++++++++
 .../java/org/apache/iceberg/GenericDataFile.java   |   3 +-
 .../java/org/apache/iceberg/util/JsonUtil.java     |  35 ++-
 .../java/org/apache/iceberg/TableTestBase.java     |   2 +-
 .../org/apache/iceberg/TestContentFileParser.java  | 338 +++++++++++++++++++++
 .../org/apache/iceberg/TestFileScanTaskParser.java | 108 +++++++
 .../apache/iceberg/TestManifestWriterVersions.java |   2 +-
 .../iceberg/catalog/TestTableIdentifierParser.java |   2 +-
 .../rest/responses/TestListTablesResponse.java     |   3 +-
 .../java/org/apache/iceberg/util/TestJsonUtil.java |  80 ++++-
 format/spec.md                                     |  35 +++
 16 files changed, 1026 insertions(+), 16 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/FileScanTask.java 
b/api/src/main/java/org/apache/iceberg/FileScanTask.java
index d99d924370..5fb4b55459 100644
--- a/api/src/main/java/org/apache/iceberg/FileScanTask.java
+++ b/api/src/main/java/org/apache/iceberg/FileScanTask.java
@@ -29,6 +29,11 @@ public interface FileScanTask extends 
ContentScanTask<DataFile>, SplittableScanT
    */
   List<DeleteFile> deletes();
 
+  /** Return the schema for this file scan task. */
+  default Schema schema() {
+    throw new UnsupportedOperationException("Does not support schema getter");
+  }
+
   @Override
   default long sizeBytes() {
     return length() + 
deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
diff --git a/core/src/main/java/org/apache/iceberg/BaseContentScanTask.java 
b/core/src/main/java/org/apache/iceberg/BaseContentScanTask.java
index e15b2b3f85..1521133c64 100644
--- a/core/src/main/java/org/apache/iceberg/BaseContentScanTask.java
+++ b/core/src/main/java/org/apache/iceberg/BaseContentScanTask.java
@@ -34,6 +34,7 @@ abstract class BaseContentScanTask<ThisT extends 
ContentScanTask<F>, F extends C
   private final String specString;
   private final ResidualEvaluator residuals;
 
+  private transient volatile Schema schema = null;
   private transient volatile PartitionSpec spec = null;
 
   BaseContentScanTask(F file, String schemaString, String specString, 
ResidualEvaluator residuals) {
@@ -52,12 +53,24 @@ abstract class BaseContentScanTask<ThisT extends 
ContentScanTask<F>, F extends C
     return file;
   }
 
+  protected Schema schema() {
+    if (schema == null) {
+      synchronized (this) {
+        if (schema == null) {
+          this.schema = SchemaParser.fromJson(schemaString);
+        }
+      }
+    }
+
+    return schema;
+  }
+
   @Override
   public PartitionSpec spec() {
     if (spec == null) {
       synchronized (this) {
         if (spec == null) {
-          this.spec = 
PartitionSpecParser.fromJson(SchemaParser.fromJson(schemaString), specString);
+          this.spec = PartitionSpecParser.fromJson(schema(), specString);
         }
       }
     }
diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java 
b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
index 2d7258be71..bff2d724f7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
@@ -53,6 +53,11 @@ public class BaseFileScanTask extends 
BaseContentScanTask<FileScanTask, DataFile
     return ImmutableList.copyOf(deletes);
   }
 
+  @Override
+  public Schema schema() {
+    return super.schema();
+  }
+
   @VisibleForTesting
   static final class SplitScanTask implements FileScanTask, 
MergeableScanTask<SplitScanTask> {
     private final long len;
diff --git a/core/src/main/java/org/apache/iceberg/ContentFileParser.java 
b/core/src/main/java/org/apache/iceberg/ContentFileParser.java
new file mode 100644
index 0000000000..b3edf2927f
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ContentFileParser.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class ContentFileParser {
+  private static final String SPEC_ID = "spec-id";
+  private static final String CONTENT = "content";
+  private static final String FILE_PATH = "file-path";
+  private static final String FILE_FORMAT = "file-format";
+  private static final String PARTITION = "partition";
+  private static final String RECORD_COUNT = "record-count";
+  private static final String FILE_SIZE = "file-size-in-bytes";
+  private static final String COLUMN_SIZES = "column-sizes";
+  private static final String VALUE_COUNTS = "value-counts";
+  private static final String NULL_VALUE_COUNTS = "null-value-counts";
+  private static final String NAN_VALUE_COUNTS = "nan-value-counts";
+  private static final String LOWER_BOUNDS = "lower-bounds";
+  private static final String UPPER_BOUNDS = "upper-bounds";
+  private static final String KEY_METADATA = "key-metadata";
+  private static final String SPLIT_OFFSETS = "split-offsets";
+  private static final String EQUALITY_IDS = "equality-ids";
+  private static final String SORT_ORDER_ID = "sort-order-id";
+
+  private ContentFileParser() {}
+
+  private static boolean hasPartitionData(StructLike partitionData) {
+    return partitionData != null && partitionData.size() > 0;
+  }
+
+  static String toJson(ContentFile<?> contentFile, PartitionSpec spec) {
+    return JsonUtil.generate(
+        generator -> ContentFileParser.toJson(contentFile, spec, generator), 
false);
+  }
+
+  static void toJson(ContentFile<?> contentFile, PartitionSpec spec, 
JsonGenerator generator)
+      throws IOException {
+    Preconditions.checkArgument(contentFile != null, "Invalid content file: 
null");
+    Preconditions.checkArgument(spec != null, "Invalid partition spec: null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
+    Preconditions.checkArgument(
+        contentFile.specId() == spec.specId(),
+        "Invalid partition spec id from content file: expected = %s, actual = 
%s",
+        spec.specId(),
+        contentFile.specId());
+    Preconditions.checkArgument(
+        spec.isPartitioned() == hasPartitionData(contentFile.partition()),
+        "Invalid partition data from content file: expected = %s, actual = %s",
+        spec.isPartitioned() ? "partitioned" : "unpartitioned",
+        hasPartitionData(contentFile.partition()) ? "partitioned" : 
"unpartitioned");
+
+    generator.writeStartObject();
+
+    // ignore the ordinal position (ContentFile#pos) of the file in a manifest,
+    // as it isn't used and BaseFile constructor doesn't support it.
+
+    generator.writeNumberField(SPEC_ID, contentFile.specId());
+    generator.writeStringField(CONTENT, contentFile.content().name());
+    generator.writeStringField(FILE_PATH, contentFile.path().toString());
+    generator.writeStringField(FILE_FORMAT, contentFile.format().name());
+
+    if (contentFile.partition() != null) {
+      generator.writeFieldName(PARTITION);
+      SingleValueParser.toJson(spec.partitionType(), contentFile.partition(), 
generator);
+    }
+
+    generator.writeNumberField(FILE_SIZE, contentFile.fileSizeInBytes());
+
+    metricsToJson(contentFile, generator);
+
+    if (contentFile.keyMetadata() != null) {
+      generator.writeFieldName(KEY_METADATA);
+      SingleValueParser.toJson(DataFile.KEY_METADATA.type(), 
contentFile.keyMetadata(), generator);
+    }
+
+    if (contentFile.splitOffsets() != null) {
+      JsonUtil.writeLongArray(SPLIT_OFFSETS, contentFile.splitOffsets(), 
generator);
+    }
+
+    if (contentFile.equalityFieldIds() != null) {
+      JsonUtil.writeIntegerArray(EQUALITY_IDS, contentFile.equalityFieldIds(), 
generator);
+    }
+
+    if (contentFile.sortOrderId() != null) {
+      generator.writeNumberField(SORT_ORDER_ID, contentFile.sortOrderId());
+    }
+
+    generator.writeEndObject();
+  }
+
+  static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for 
content file: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for content file: non-object 
(%s)", jsonNode);
+    Preconditions.checkArgument(spec != null, "Invalid partition spec: null");
+
+    int specId = JsonUtil.getInt(SPEC_ID, jsonNode);
+    FileContent fileContent = FileContent.valueOf(JsonUtil.getString(CONTENT, 
jsonNode));
+    String filePath = JsonUtil.getString(FILE_PATH, jsonNode);
+    FileFormat fileFormat = 
FileFormat.fromString(JsonUtil.getString(FILE_FORMAT, jsonNode));
+
+    PartitionData partitionData = null;
+    if (jsonNode.has(PARTITION)) {
+      partitionData = new PartitionData(spec.partitionType());
+      StructLike structLike =
+          (StructLike) SingleValueParser.fromJson(spec.partitionType(), 
jsonNode.get(PARTITION));
+      Preconditions.checkState(
+          partitionData.size() == structLike.size(),
+          "Invalid partition data size: expected = %s, actual = %s",
+          partitionData.size(),
+          structLike.size());
+      for (int pos = 0; pos < partitionData.size(); ++pos) {
+        Class<?> javaClass = 
spec.partitionType().fields().get(pos).type().typeId().javaClass();
+        partitionData.set(pos, structLike.get(pos, javaClass));
+      }
+    }
+
+    long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE, jsonNode);
+    Metrics metrics = metricsFromJson(jsonNode);
+    ByteBuffer keyMetadata = JsonUtil.getByteBufferOrNull(KEY_METADATA, 
jsonNode);
+    List<Long> splitOffsets = JsonUtil.getLongListOrNull(SPLIT_OFFSETS, 
jsonNode);
+    int[] equalityFieldIds = JsonUtil.getIntArrayOrNull(EQUALITY_IDS, 
jsonNode);
+    Integer sortOrderId = JsonUtil.getIntOrNull(SORT_ORDER_ID, jsonNode);
+
+    if (fileContent == FileContent.DATA) {
+      return new GenericDataFile(
+          specId,
+          filePath,
+          fileFormat,
+          partitionData,
+          fileSizeInBytes,
+          metrics,
+          keyMetadata,
+          splitOffsets,
+          equalityFieldIds,
+          sortOrderId);
+    } else {
+      return new GenericDeleteFile(
+          specId,
+          fileContent,
+          filePath,
+          fileFormat,
+          partitionData,
+          fileSizeInBytes,
+          metrics,
+          equalityFieldIds,
+          sortOrderId,
+          splitOffsets,
+          keyMetadata);
+    }
+  }
+
+  private static void metricsToJson(ContentFile<?> contentFile, JsonGenerator 
generator)
+      throws IOException {
+    generator.writeNumberField(RECORD_COUNT, contentFile.recordCount());
+
+    if (contentFile.columnSizes() != null) {
+      generator.writeFieldName(COLUMN_SIZES);
+      SingleValueParser.toJson(DataFile.COLUMN_SIZES.type(), 
contentFile.columnSizes(), generator);
+    }
+
+    if (contentFile.valueCounts() != null) {
+      generator.writeFieldName(VALUE_COUNTS);
+      SingleValueParser.toJson(DataFile.VALUE_COUNTS.type(), 
contentFile.valueCounts(), generator);
+    }
+
+    if (contentFile.nullValueCounts() != null) {
+      generator.writeFieldName(NULL_VALUE_COUNTS);
+      SingleValueParser.toJson(
+          DataFile.NULL_VALUE_COUNTS.type(), contentFile.nullValueCounts(), 
generator);
+    }
+
+    if (contentFile.nullValueCounts() != null) {
+      generator.writeFieldName(NAN_VALUE_COUNTS);
+      SingleValueParser.toJson(
+          DataFile.NAN_VALUE_COUNTS.type(), contentFile.nanValueCounts(), 
generator);
+    }
+
+    if (contentFile.lowerBounds() != null) {
+      generator.writeFieldName(LOWER_BOUNDS);
+      SingleValueParser.toJson(DataFile.LOWER_BOUNDS.type(), 
contentFile.lowerBounds(), generator);
+    }
+
+    if (contentFile.upperBounds() != null) {
+      generator.writeFieldName(UPPER_BOUNDS);
+      SingleValueParser.toJson(DataFile.UPPER_BOUNDS.type(), 
contentFile.upperBounds(), generator);
+    }
+  }
+
+  private static Metrics metricsFromJson(JsonNode jsonNode) {
+    long recordCount = JsonUtil.getLong(RECORD_COUNT, jsonNode);
+
+    Map<Integer, Long> columnSizes = null;
+    if (jsonNode.has(COLUMN_SIZES)) {
+      columnSizes =
+          (Map<Integer, Long>)
+              SingleValueParser.fromJson(DataFile.COLUMN_SIZES.type(), 
jsonNode.get(COLUMN_SIZES));
+    }
+
+    Map<Integer, Long> valueCounts = null;
+    if (jsonNode.has(VALUE_COUNTS)) {
+      valueCounts =
+          (Map<Integer, Long>)
+              SingleValueParser.fromJson(DataFile.VALUE_COUNTS.type(), 
jsonNode.get(VALUE_COUNTS));
+    }
+
+    Map<Integer, Long> nullValueCounts = null;
+    if (jsonNode.has(NULL_VALUE_COUNTS)) {
+      nullValueCounts =
+          (Map<Integer, Long>)
+              SingleValueParser.fromJson(
+                  DataFile.NULL_VALUE_COUNTS.type(), 
jsonNode.get(NULL_VALUE_COUNTS));
+    }
+
+    Map<Integer, Long> nanValueCounts = null;
+    if (jsonNode.has(NAN_VALUE_COUNTS)) {
+      nanValueCounts =
+          (Map<Integer, Long>)
+              SingleValueParser.fromJson(
+                  DataFile.NAN_VALUE_COUNTS.type(), 
jsonNode.get(NAN_VALUE_COUNTS));
+    }
+
+    Map<Integer, ByteBuffer> lowerBounds = null;
+    if (jsonNode.has(LOWER_BOUNDS)) {
+      lowerBounds =
+          (Map<Integer, ByteBuffer>)
+              SingleValueParser.fromJson(DataFile.LOWER_BOUNDS.type(), 
jsonNode.get(LOWER_BOUNDS));
+    }
+
+    Map<Integer, ByteBuffer> upperBounds = null;
+    if (jsonNode.has(UPPER_BOUNDS)) {
+      upperBounds =
+          (Map<Integer, ByteBuffer>)
+              SingleValueParser.fromJson(DataFile.UPPER_BOUNDS.type(), 
jsonNode.get(UPPER_BOUNDS));
+    }
+
+    return new Metrics(
+        recordCount,
+        columnSizes,
+        valueCounts,
+        nullValueCounts,
+        nanValueCounts,
+        lowerBounds,
+        upperBounds);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java 
b/core/src/main/java/org/apache/iceberg/DataFiles.java
index ef95c0bdf6..95b2891c98 100644
--- a/core/src/main/java/org/apache/iceberg/DataFiles.java
+++ b/core/src/main/java/org/apache/iceberg/DataFiles.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.util.ArrayUtil;
 import org.apache.iceberg.util.ByteBuffers;
 
 public class DataFiles {
@@ -123,7 +124,6 @@ public class DataFiles {
     private FileFormat format = null;
     private long recordCount = -1L;
     private long fileSizeInBytes = -1L;
-    private Integer sortOrderId = SortOrder.unsorted().orderId();
 
     // optional fields
     private Map<Integer, Long> columnSizes = null;
@@ -134,6 +134,8 @@ public class DataFiles {
     private Map<Integer, ByteBuffer> upperBounds = null;
     private ByteBuffer keyMetadata = null;
     private List<Long> splitOffsets = null;
+    private List<Integer> equalityFieldIds = null;
+    private Integer sortOrderId = SortOrder.unsorted().orderId();
 
     public Builder(PartitionSpec spec) {
       this.spec = spec;
@@ -269,6 +271,14 @@ public class DataFiles {
       return this;
     }
 
+    public Builder withEqualityFieldIds(List<Integer> equalityIds) {
+      if (equalityIds != null) {
+        this.equalityFieldIds = ImmutableList.copyOf(equalityIds);
+      }
+
+      return this;
+    }
+
     public Builder withEncryptionKeyMetadata(ByteBuffer newKeyMetadata) {
       this.keyMetadata = newKeyMetadata;
       return this;
@@ -310,6 +320,7 @@ public class DataFiles {
               upperBounds),
           keyMetadata,
           splitOffsets,
+          ArrayUtil.toIntArray(equalityFieldIds),
           sortOrderId);
     }
   }
diff --git a/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java 
b/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
new file mode 100644
index 0000000000..b747eff98b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/FileScanTaskParser.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionParser;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.JsonUtil;
+
+public class FileScanTaskParser {
+  private static final String SCHEMA = "schema";
+  private static final String SPEC = "spec";
+  private static final String DATA_FILE = "data-file";
+  private static final String DELETE_FILES = "delete-files";
+  private static final String RESIDUAL = "residual-filter";
+
+  private FileScanTaskParser() {}
+
+  public static String toJson(FileScanTask fileScanTask) {
+    return JsonUtil.generate(
+        generator -> FileScanTaskParser.toJson(fileScanTask, generator), 
false);
+  }
+
+  private static void toJson(FileScanTask fileScanTask, JsonGenerator 
generator)
+      throws IOException {
+    Preconditions.checkArgument(fileScanTask != null, "Invalid file scan task: 
null");
+    Preconditions.checkArgument(generator != null, "Invalid JSON generator: 
null");
+    generator.writeStartObject();
+
+    generator.writeFieldName(SCHEMA);
+    SchemaParser.toJson(fileScanTask.schema(), generator);
+
+    generator.writeFieldName(SPEC);
+    PartitionSpec spec = fileScanTask.spec();
+    PartitionSpecParser.toJson(spec, generator);
+
+    if (fileScanTask.file() != null) {
+      generator.writeFieldName(DATA_FILE);
+      ContentFileParser.toJson(fileScanTask.file(), spec, generator);
+    }
+
+    if (fileScanTask.deletes() != null) {
+      generator.writeArrayFieldStart(DELETE_FILES);
+      for (DeleteFile deleteFile : fileScanTask.deletes()) {
+        ContentFileParser.toJson(deleteFile, spec, generator);
+      }
+      generator.writeEndArray();
+    }
+
+    if (fileScanTask.residual() != null) {
+      generator.writeFieldName(RESIDUAL);
+      ExpressionParser.toJson(fileScanTask.residual(), generator);
+    }
+
+    generator.writeEndObject();
+  }
+
+  public static FileScanTask fromJson(String json, boolean caseSensitive) {
+    Preconditions.checkArgument(json != null, "Invalid JSON string for file 
scan task: null");
+    return JsonUtil.parse(json, node -> FileScanTaskParser.fromJson(node, 
caseSensitive));
+  }
+
+  private static FileScanTask fromJson(JsonNode jsonNode, boolean 
caseSensitive) {
+    Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for file 
scan task: null");
+    Preconditions.checkArgument(
+        jsonNode.isObject(), "Invalid JSON node for file scan task: non-object 
(%s)", jsonNode);
+
+    Schema schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode));
+    String schemaString = SchemaParser.toJson(schema);
+
+    PartitionSpec spec = PartitionSpecParser.fromJson(schema, 
JsonUtil.get(SPEC, jsonNode));
+    String specString = PartitionSpecParser.toJson(spec);
+
+    DataFile dataFile = null;
+    if (jsonNode.has(DATA_FILE)) {
+      dataFile = (DataFile) 
ContentFileParser.fromJson(jsonNode.get(DATA_FILE), spec);
+    }
+
+    DeleteFile[] deleteFiles = null;
+    if (jsonNode.has(DELETE_FILES)) {
+      JsonNode deletesArray = jsonNode.get(DELETE_FILES);
+      Preconditions.checkArgument(
+          deletesArray.isArray(),
+          "Invalid JSON node for delete files: non-array (%s)",
+          deletesArray);
+      // parse the schema array
+      ImmutableList.Builder<DeleteFile> builder = ImmutableList.builder();
+      for (JsonNode deleteFileNode : deletesArray) {
+        DeleteFile deleteFile = (DeleteFile) 
ContentFileParser.fromJson(deleteFileNode, spec);
+        builder.add(deleteFile);
+      }
+
+      deleteFiles = builder.build().toArray(new DeleteFile[0]);
+    }
+
+    Expression filter = Expressions.alwaysTrue();
+    if (jsonNode.has(RESIDUAL)) {
+      filter = ExpressionParser.fromJson(jsonNode.get(RESIDUAL));
+    }
+
+    ResidualEvaluator residualEvaluator = ResidualEvaluator.of(spec, filter, 
caseSensitive);
+    return new BaseFileScanTask(dataFile, deleteFiles, schemaString, 
specString, residualEvaluator);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java 
b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
index 34c65e669f..07c5172f1b 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
@@ -40,6 +40,7 @@ class GenericDataFile extends BaseFile<DataFile> implements 
DataFile {
       Metrics metrics,
       ByteBuffer keyMetadata,
       List<Long> splitOffsets,
+      int[] equalityFieldIds,
       Integer sortOrderId) {
     super(
         specId,
@@ -56,7 +57,7 @@ class GenericDataFile extends BaseFile<DataFile> implements 
DataFile {
         metrics.lowerBounds(),
         metrics.upperBounds(),
         splitOffsets,
-        null,
+        equalityFieldIds,
         sortOrderId,
         keyMetadata);
   }
diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java 
b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
index 84c0681164..aa90c63f80 100644
--- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
@@ -26,14 +26,17 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
 
 public class JsonUtil {
 
@@ -173,12 +176,24 @@ public class JsonUtil {
     return getString(property, node);
   }
 
+  public static ByteBuffer getByteBufferOrNull(String property, JsonNode node) 
{
+    if (!node.has(property) || node.get(property).isNull()) {
+      return null;
+    }
+
+    JsonNode pNode = node.get(property);
+    Preconditions.checkArgument(
+        pNode.isTextual(), "Cannot parse byte buffer from non-text value: %s: 
%s", property, pNode);
+    return ByteBuffer.wrap(
+        
BaseEncoding.base16().decode(pNode.textValue().toUpperCase(Locale.ROOT)));
+  }
+
   public static Map<String, String> getStringMap(String property, JsonNode 
node) {
     Preconditions.checkArgument(node.has(property), "Cannot parse missing map: 
%s", property);
     JsonNode pNode = node.get(property);
     Preconditions.checkArgument(
         pNode != null && !pNode.isNull() && pNode.isObject(),
-        "Cannot parse from non-object value: %s: %s",
+        "Cannot parse string map from non-object value: %s: %s",
         property,
         pNode);
 
@@ -229,6 +244,14 @@ public class JsonUtil {
         .build();
   }
 
+  public static int[] getIntArrayOrNull(String property, JsonNode node) {
+    if (!node.has(property) || node.get(property).isNull()) {
+      return null;
+    }
+
+    return ArrayUtil.toIntArray(getIntegerList(property, node));
+  }
+
   public static List<Integer> getIntegerList(String property, JsonNode node) {
     Preconditions.checkArgument(node.has(property), "Cannot parse missing 
list: %s", property);
     return ImmutableList.<Integer>builder()
@@ -256,6 +279,14 @@ public class JsonUtil {
     return ImmutableList.<Long>builder().addAll(new 
JsonLongArrayIterator(property, node)).build();
   }
 
+  public static List<Long> getLongListOrNull(String property, JsonNode node) {
+    if (!node.has(property) || node.get(property).isNull()) {
+      return null;
+    }
+
+    return ImmutableList.<Long>builder().addAll(new 
JsonLongArrayIterator(property, node)).build();
+  }
+
   public static Set<Long> getLongSetOrNull(String property, JsonNode node) {
     if (!node.hasNonNull(property)) {
       return null;
@@ -291,7 +322,7 @@ public class JsonUtil {
       JsonNode pNode = node.get(property);
       Preconditions.checkArgument(
           pNode != null && !pNode.isNull() && pNode.isArray(),
-          "Cannot parse from non-array value: %s: %s",
+          "Cannot parse JSON array from non-array value: %s: %s",
           property,
           pNode);
       this.elements = pNode.elements();
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java 
b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 038dfadbff..a800214bc9 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -57,7 +57,7 @@ public class TableTestBase {
   protected static final int BUCKETS_NUMBER = 16;
 
   // Partition spec used to create tables
-  protected static final PartitionSpec SPEC =
+  public static final PartitionSpec SPEC =
       PartitionSpec.builderFor(SCHEMA).bucket("data", BUCKETS_NUMBER).build();
 
   static final DataFile FILE_A =
diff --git a/core/src/test/java/org/apache/iceberg/TestContentFileParser.java 
b/core/src/test/java/org/apache/iceberg/TestContentFileParser.java
new file mode 100644
index 0000000000..9360f571c5
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestContentFileParser.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Stream;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.JsonUtil;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+public class TestContentFileParser {
+  @Test
+  public void testNullArguments() throws Exception {
+    Assertions.assertThatThrownBy(() -> ContentFileParser.toJson(null, 
TableTestBase.SPEC))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid content file: null");
+
+    Assertions.assertThatThrownBy(() -> 
ContentFileParser.toJson(TableTestBase.FILE_A, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid partition spec: null");
+
+    Assertions.assertThatThrownBy(
+            () -> ContentFileParser.toJson(TableTestBase.FILE_A, 
TableTestBase.SPEC, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON generator: null");
+
+    Assertions.assertThatThrownBy(() -> ContentFileParser.fromJson(null, 
TableTestBase.SPEC))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON node for content file: null");
+
+    String jsonStr = ContentFileParser.toJson(TableTestBase.FILE_A, 
TableTestBase.SPEC);
+    JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr);
+    Assertions.assertThatThrownBy(() -> ContentFileParser.fromJson(jsonNode, 
null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid partition spec: null");
+  }
+
+  @ParameterizedTest
+  @MethodSource("provideSpecAndDataFile")
+  public void testDataFile(PartitionSpec spec, DataFile dataFile, String 
expectedJson)
+      throws Exception {
+    String jsonStr = ContentFileParser.toJson(dataFile, spec);
+    Assertions.assertThat(jsonStr).isEqualTo(expectedJson);
+    JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr);
+    ContentFile<?> deserializedContentFile = 
ContentFileParser.fromJson(jsonNode, spec);
+    
Assertions.assertThat(deserializedContentFile).isInstanceOf(DataFile.class);
+    assertContentFileEquals(dataFile, deserializedContentFile, spec);
+  }
+
+  @ParameterizedTest
+  @MethodSource("provideSpecAndDeleteFile")
+  public void testDeleteFile(PartitionSpec spec, DeleteFile deleteFile, String 
expectedJson)
+      throws Exception {
+    String jsonStr = ContentFileParser.toJson(deleteFile, spec);
+    Assertions.assertThat(jsonStr).isEqualTo(expectedJson);
+    JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr);
+    ContentFile<?> deserializedContentFile = 
ContentFileParser.fromJson(jsonNode, spec);
+    
Assertions.assertThat(deserializedContentFile).isInstanceOf(DeleteFile.class);
+    assertContentFileEquals(deleteFile, deserializedContentFile, spec);
+  }
+
+  private static Stream<Arguments> provideSpecAndDataFile() {
+    return Stream.of(
+        Arguments.of(
+            PartitionSpec.unpartitioned(),
+            dataFileWithRequiredOnly(PartitionSpec.unpartitioned()),
+            dataFileJsonWithRequiredOnly(PartitionSpec.unpartitioned())),
+        Arguments.of(
+            PartitionSpec.unpartitioned(),
+            dataFileWithAllOptional(PartitionSpec.unpartitioned()),
+            dataFileJsonWithAllOptional(PartitionSpec.unpartitioned())),
+        Arguments.of(
+            TableTestBase.SPEC,
+            dataFileWithRequiredOnly(TableTestBase.SPEC),
+            dataFileJsonWithRequiredOnly(TableTestBase.SPEC)),
+        Arguments.of(
+            TableTestBase.SPEC,
+            dataFileWithAllOptional(TableTestBase.SPEC),
+            dataFileJsonWithAllOptional(TableTestBase.SPEC)));
+  }
+
+  private static DataFile dataFileWithRequiredOnly(PartitionSpec spec) {
+    DataFiles.Builder builder =
+        DataFiles.builder(spec)
+            .withPath("/path/to/data-a.parquet")
+            .withFileSizeInBytes(10)
+            .withRecordCount(1);
+
+    if (spec.isPartitioned()) {
+      // easy way to set partition data for now
+      builder.withPartitionPath("data_bucket=1");
+    }
+
+    return builder.build();
+  }
+
+  private static String dataFileJsonWithRequiredOnly(PartitionSpec spec) {
+    if (spec.isUnpartitioned()) {
+      return 
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\",\"file-format\":\"PARQUET\","
+          + 
"\"partition\":{},\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}";
+    } else {
+      return 
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\",\"file-format\":\"PARQUET\","
+          + 
"\"partition\":{\"1000\":1},\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}";
+    }
+  }
+
+  private static String dataFileJsonWithAllOptional(PartitionSpec spec) {
+    if (spec.isUnpartitioned()) {
+      return 
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-with-stats.parquet\","
+          + 
"\"file-format\":\"PARQUET\",\"partition\":{},\"file-size-in-bytes\":350,\"record-count\":10,"
+          + "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]},"
+          + "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]},"
+          + "\"null-value-counts\":{\"keys\":[3,4],\"values\":[10,20]},"
+          + "\"nan-value-counts\":{\"keys\":[3,4],\"values\":[0,0]},"
+          + 
"\"lower-bounds\":{\"keys\":[3,4],\"values\":[\"01000000\",\"02000000\"]},"
+          + 
"\"upper-bounds\":{\"keys\":[3,4],\"values\":[\"05000000\",\"0A000000\"]},"
+          + "\"key-metadata\":\"00000000000000000000000000000000\","
+          + 
"\"split-offsets\":[128,256],\"equality-ids\":[1],\"sort-order-id\":1}";
+    } else {
+      return 
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-with-stats.parquet\","
+          + 
"\"file-format\":\"PARQUET\",\"partition\":{\"1000\":1},\"file-size-in-bytes\":350,\"record-count\":10,"
+          + "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]},"
+          + "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]},"
+          + "\"null-value-counts\":{\"keys\":[3,4],\"values\":[10,20]},"
+          + "\"nan-value-counts\":{\"keys\":[3,4],\"values\":[0,0]},"
+          + 
"\"lower-bounds\":{\"keys\":[3,4],\"values\":[\"01000000\",\"02000000\"]},"
+          + 
"\"upper-bounds\":{\"keys\":[3,4],\"values\":[\"05000000\",\"0A000000\"]},"
+          + "\"key-metadata\":\"00000000000000000000000000000000\","
+          + 
"\"split-offsets\":[128,256],\"equality-ids\":[1],\"sort-order-id\":1}";
+    }
+  }
+
+  private static DataFile dataFileWithAllOptional(PartitionSpec spec) {
+    DataFiles.Builder builder =
+        DataFiles.builder(spec)
+            .withPath("/path/to/data-with-stats.parquet")
+            .withMetrics(
+                new Metrics(
+                    10L, // record count
+                    ImmutableMap.of(3, 100L, 4, 200L), // column sizes
+                    ImmutableMap.of(3, 90L, 4, 180L), // value counts
+                    ImmutableMap.of(3, 10L, 4, 20L), // null value counts
+                    ImmutableMap.of(3, 0L, 4, 0L), // nan value counts
+                    ImmutableMap.of(
+                        3,
+                        Conversions.toByteBuffer(Types.IntegerType.get(), 1),
+                        4,
+                        Conversions.toByteBuffer(Types.IntegerType.get(), 2)), 
// lower bounds
+                    ImmutableMap.of(
+                        3,
+                        Conversions.toByteBuffer(Types.IntegerType.get(), 5),
+                        4,
+                        Conversions.toByteBuffer(Types.IntegerType.get(), 10)) 
// upperbounds
+                    ))
+            .withFileSizeInBytes(350)
+            .withSplitOffsets(Arrays.asList(128L, 256L))
+            .withEqualityFieldIds(Collections.singletonList(1))
+            .withEncryptionKeyMetadata(ByteBuffer.wrap(new byte[16]))
+            .withSortOrder(
+                SortOrder.builderFor(TableTestBase.SCHEMA)
+                    .withOrderId(1)
+                    .sortBy("id", SortDirection.ASC, NullOrder.NULLS_FIRST)
+                    .build());
+
+    if (spec.isPartitioned()) {
+      // easy way to set partition data for now
+      builder.withPartitionPath("data_bucket=1");
+    }
+
+    return builder.build();
+  }
+
+  private static Stream<Arguments> provideSpecAndDeleteFile() {
+    return Stream.of(
+        Arguments.of(
+            PartitionSpec.unpartitioned(),
+            deleteFileWithRequiredOnly(PartitionSpec.unpartitioned()),
+            deleteFileJsonWithRequiredOnly(PartitionSpec.unpartitioned())),
+        Arguments.of(
+            PartitionSpec.unpartitioned(),
+            deleteFileWithAllOptional(PartitionSpec.unpartitioned()),
+            deleteFileJsonWithAllOptional(PartitionSpec.unpartitioned())),
+        Arguments.of(
+            TableTestBase.SPEC,
+            deleteFileWithRequiredOnly(TableTestBase.SPEC),
+            deleteFileJsonWithRequiredOnly(TableTestBase.SPEC)),
+        Arguments.of(
+            TableTestBase.SPEC,
+            deleteFileWithAllOptional(TableTestBase.SPEC),
+            deleteFileJsonWithAllOptional(TableTestBase.SPEC)));
+  }
+
+  private static DeleteFile deleteFileWithRequiredOnly(PartitionSpec spec) {
+    PartitionData partitionData = null;
+    if (spec.isPartitioned()) {
+      partitionData = new PartitionData(spec.partitionType());
+      partitionData.set(0, 9);
+    }
+
+    return new GenericDeleteFile(
+        spec.specId(),
+        FileContent.POSITION_DELETES,
+        "/path/to/delete-a.parquet",
+        FileFormat.PARQUET,
+        partitionData,
+        1234,
+        new Metrics(9L, null, null, null, null),
+        null,
+        null,
+        null,
+        null);
+  }
+
+  private static DeleteFile deleteFileWithAllOptional(PartitionSpec spec) {
+    PartitionData partitionData = new PartitionData(spec.partitionType());
+    if (spec.isPartitioned()) {
+      partitionData.set(0, 9);
+    }
+
+    Metrics metrics =
+        new Metrics(
+            10L, // record count
+            ImmutableMap.of(3, 100L, 4, 200L), // column sizes
+            ImmutableMap.of(3, 90L, 4, 180L), // value counts
+            ImmutableMap.of(3, 10L, 4, 20L), // null value counts
+            ImmutableMap.of(3, 0L, 4, 0L), // nan value counts
+            ImmutableMap.of(
+                3,
+                Conversions.toByteBuffer(Types.IntegerType.get(), 1),
+                4,
+                Conversions.toByteBuffer(Types.IntegerType.get(), 2)), // 
lower bounds
+            ImmutableMap.of(
+                3,
+                Conversions.toByteBuffer(Types.IntegerType.get(), 5),
+                4,
+                Conversions.toByteBuffer(Types.IntegerType.get(), 10)) // 
upperbounds
+            );
+
+    return new GenericDeleteFile(
+        spec.specId(),
+        FileContent.EQUALITY_DELETES,
+        "/path/to/delete-with-stats.parquet",
+        FileFormat.PARQUET,
+        partitionData,
+        1234,
+        metrics,
+        new int[] {3},
+        1,
+        Arrays.asList(128L),
+        ByteBuffer.wrap(new byte[16]));
+  }
+
+  private static String deleteFileJsonWithRequiredOnly(PartitionSpec spec) {
+    if (spec.isUnpartitioned()) {
+      return 
"{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete-a.parquet\","
+          + 
"\"file-format\":\"PARQUET\",\"partition\":{},\"file-size-in-bytes\":1234,\"record-count\":9}";
+    } else {
+      return 
"{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete-a.parquet\","
+          + 
"\"file-format\":\"PARQUET\",\"partition\":{\"1000\":9},\"file-size-in-bytes\":1234,\"record-count\":9}";
+    }
+  }
+
+  private static String deleteFileJsonWithAllOptional(PartitionSpec spec) {
+    if (spec.isUnpartitioned()) {
+      return 
"{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/delete-with-stats.parquet\","
+          + 
"\"file-format\":\"PARQUET\",\"partition\":{},\"file-size-in-bytes\":1234,\"record-count\":10,"
+          + "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]},"
+          + "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]},"
+          + "\"null-value-counts\":{\"keys\":[3,4],\"values\":[10,20]},"
+          + "\"nan-value-counts\":{\"keys\":[3,4],\"values\":[0,0]},"
+          + 
"\"lower-bounds\":{\"keys\":[3,4],\"values\":[\"01000000\",\"02000000\"]},"
+          + 
"\"upper-bounds\":{\"keys\":[3,4],\"values\":[\"05000000\",\"0A000000\"]},"
+          + "\"key-metadata\":\"00000000000000000000000000000000\","
+          + 
"\"split-offsets\":[128],\"equality-ids\":[3],\"sort-order-id\":1}";
+    } else {
+      return 
"{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/delete-with-stats.parquet\","
+          + 
"\"file-format\":\"PARQUET\",\"partition\":{\"1000\":9},\"file-size-in-bytes\":1234,\"record-count\":10,"
+          + "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]},"
+          + "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]},"
+          + "\"null-value-counts\":{\"keys\":[3,4],\"values\":[10,20]},"
+          + "\"nan-value-counts\":{\"keys\":[3,4],\"values\":[0,0]},"
+          + 
"\"lower-bounds\":{\"keys\":[3,4],\"values\":[\"01000000\",\"02000000\"]},"
+          + 
"\"upper-bounds\":{\"keys\":[3,4],\"values\":[\"05000000\",\"0A000000\"]},"
+          + "\"key-metadata\":\"00000000000000000000000000000000\","
+          + 
"\"split-offsets\":[128],\"equality-ids\":[3],\"sort-order-id\":1}";
+    }
+  }
+
+  static void assertContentFileEquals(
+      ContentFile<?> expected, ContentFile<?> actual, PartitionSpec spec) {
+    Assertions.assertThat(actual.getClass()).isEqualTo(expected.getClass());
+    Assertions.assertThat(actual.specId()).isEqualTo(expected.specId());
+    Assertions.assertThat(actual.content()).isEqualTo(expected.content());
+    Assertions.assertThat(actual.path()).isEqualTo(expected.path());
+    Assertions.assertThat(actual.format()).isEqualTo(expected.format());
+    Assertions.assertThat(actual.partition())
+        .usingComparator(Comparators.forType(spec.partitionType()))
+        .isEqualTo(expected.partition());
+    
Assertions.assertThat(actual.recordCount()).isEqualTo(expected.recordCount());
+    
Assertions.assertThat(actual.fileSizeInBytes()).isEqualTo(expected.fileSizeInBytes());
+    
Assertions.assertThat(actual.columnSizes()).isEqualTo(expected.columnSizes());
+    
Assertions.assertThat(actual.valueCounts()).isEqualTo(expected.valueCounts());
+    
Assertions.assertThat(actual.nullValueCounts()).isEqualTo(expected.nullValueCounts());
+    
Assertions.assertThat(actual.nanValueCounts()).isEqualTo(expected.nanValueCounts());
+    
Assertions.assertThat(actual.lowerBounds()).isEqualTo(expected.lowerBounds());
+    
Assertions.assertThat(actual.upperBounds()).isEqualTo(expected.upperBounds());
+    
Assertions.assertThat(actual.keyMetadata()).isEqualTo(expected.keyMetadata());
+    
Assertions.assertThat(actual.splitOffsets()).isEqualTo(expected.splitOffsets());
+    
Assertions.assertThat(actual.equalityFieldIds()).isEqualTo(expected.equalityFieldIds());
+    
Assertions.assertThat(actual.sortOrderId()).isEqualTo(expected.sortOrderId());
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java 
b/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
new file mode 100644
index 0000000000..221a5507b1
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestFileScanTaskParser.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestFileScanTaskParser {
+  @Test
+  public void testNullArguments() {
+    Assertions.assertThatThrownBy(() -> FileScanTaskParser.toJson(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid file scan task: null");
+
+    Assertions.assertThatThrownBy(() -> FileScanTaskParser.fromJson(null, 
true))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid JSON string for file scan task: null");
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testParser(boolean caseSensitive) {
+    PartitionSpec spec = TableTestBase.SPEC;
+    FileScanTask fileScanTask = createScanTask(spec, caseSensitive);
+    String jsonStr = FileScanTaskParser.toJson(fileScanTask);
+    Assertions.assertThat(jsonStr).isEqualTo(expectedFileScanTaskJson());
+    FileScanTask deserializedTask = FileScanTaskParser.fromJson(jsonStr, 
caseSensitive);
+    assertFileScanTaskEquals(fileScanTask, deserializedTask, spec, 
caseSensitive);
+  }
+
+  private FileScanTask createScanTask(PartitionSpec spec, boolean 
caseSensitive) {
+    ResidualEvaluator residualEvaluator;
+    if (spec.isUnpartitioned()) {
+      residualEvaluator = 
ResidualEvaluator.unpartitioned(Expressions.alwaysTrue());
+    } else {
+      residualEvaluator = ResidualEvaluator.of(spec, Expressions.equal("id", 
1), caseSensitive);
+    }
+
+    return new BaseFileScanTask(
+        TableTestBase.FILE_A,
+        new DeleteFile[] {TableTestBase.FILE_A_DELETES, 
TableTestBase.FILE_A2_DELETES},
+        SchemaParser.toJson(TableTestBase.SCHEMA),
+        PartitionSpecParser.toJson(spec),
+        residualEvaluator);
+  }
+
+  private String expectedFileScanTaskJson() {
+    return "{\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":["
+        + "{\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"},"
+        + 
"{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]},"
+        + "\"spec\":{\"spec-id\":0,\"fields\":[{\"name\":\"data_bucket\","
+        + "\"transform\":\"bucket[16]\",\"source-id\":4,\"field-id\":1000}]},"
+        + 
"\"data-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\","
+        + "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0},"
+        + "\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0},"
+        + "\"delete-files\":[{\"spec-id\":0,\"content\":\"POSITION_DELETES\","
+        + 
"\"file-path\":\"/path/to/data-a-deletes.parquet\",\"file-format\":\"PARQUET\","
+        + 
"\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,\"record-count\":1},"
+        + 
"{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/data-a2-deletes.parquet\","
+        + 
"\"file-format\":\"PARQUET\",\"partition\":{\"1000\":0},\"file-size-in-bytes\":10,"
+        + "\"record-count\":1,\"equality-ids\":[1],\"sort-order-id\":0}],"
+        + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}}";
+  }
+
+  private static void assertFileScanTaskEquals(
+      FileScanTask expected, FileScanTask actual, PartitionSpec spec, boolean 
caseSensitive) {
+    TestContentFileParser.assertContentFileEquals(expected.file(), 
actual.file(), spec);
+    Assertions.assertThat(actual.deletes()).hasSameSizeAs(expected.deletes());
+    for (int pos = 0; pos < expected.deletes().size(); ++pos) {
+      TestContentFileParser.assertContentFileEquals(
+          expected.deletes().get(pos), actual.deletes().get(pos), spec);
+    }
+
+    Assertions.assertThat(expected.schema().sameSchema(actual.schema()))
+        .as("Schema should match")
+        .isTrue();
+    Assertions.assertThat(actual.spec()).isEqualTo(expected.spec());
+    Assertions.assertThat(
+            ExpressionUtil.equivalent(
+                expected.residual(),
+                actual.residual(),
+                TableTestBase.SCHEMA.asStruct(),
+                caseSensitive))
+        .as("Residual expression should match")
+        .isTrue();
+  }
+}
diff --git 
a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java 
b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index 740791b255..08b27d7460 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -78,7 +78,7 @@ public class TestManifestWriterVersions {
 
   private static final DataFile DATA_FILE =
       new GenericDataFile(
-          0, PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS, 
SORT_ORDER_ID);
+          0, PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS, null, 
SORT_ORDER_ID);
 
   private static final List<Integer> EQUALITY_IDS = ImmutableList.of(1);
   private static final int[] EQUALITY_ID_ARR = new int[] {1};
diff --git 
a/core/src/test/java/org/apache/iceberg/catalog/TestTableIdentifierParser.java 
b/core/src/test/java/org/apache/iceberg/catalog/TestTableIdentifierParser.java
index 04972e5cd6..587b9395f5 100644
--- 
a/core/src/test/java/org/apache/iceberg/catalog/TestTableIdentifierParser.java
+++ 
b/core/src/test/java/org/apache/iceberg/catalog/TestTableIdentifierParser.java
@@ -95,7 +95,7 @@ public class TestTableIdentifierParser {
     String invalidNamespace = 
"{\"namespace\":\"accounting.tax\",\"name\":\"paid\"}";
     Assertions.assertThatThrownBy(() -> 
TableIdentifierParser.fromJson(invalidNamespace))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot parse from non-array value: namespace: 
\"accounting.tax\"");
+        .hasMessage("Cannot parse JSON array from non-array value: namespace: 
\"accounting.tax\"");
 
     String invalidName = 
"{\"namespace\":[\"accounting\",\"tax\"],\"name\":1234}";
     Assertions.assertThatThrownBy(() -> 
TableIdentifierParser.fromJson(invalidName))
diff --git 
a/core/src/test/java/org/apache/iceberg/rest/responses/TestListTablesResponse.java
 
b/core/src/test/java/org/apache/iceberg/rest/responses/TestListTablesResponse.java
index 9e8bffaf6d..116d43a6d1 100644
--- 
a/core/src/test/java/org/apache/iceberg/rest/responses/TestListTablesResponse.java
+++ 
b/core/src/test/java/org/apache/iceberg/rest/responses/TestListTablesResponse.java
@@ -67,7 +67,8 @@ public class TestListTablesResponse extends 
RequestResponseTestBase<ListTablesRe
         
"{\"identifiers\":[{\"namespace\":\"accounting.tax\",\"name\":\"paid\"}]}";
     Assertions.assertThatThrownBy(() -> 
deserialize(jsonWithInvalidIdentifiersInList))
         .isInstanceOf(JsonProcessingException.class)
-        .hasMessageContaining("Cannot parse from non-array value");
+        .hasMessageContaining(
+            "Cannot parse JSON array from non-array value: namespace: 
\"accounting.tax\"");
 
     String jsonWithInvalidIdentifiersInList2 =
         
"{\"identifiers\":[{\"namespace\":[\"accounting\",\"tax\"],\"name\":\"paid\"},\"accounting.tax.paid\"]}";
diff --git a/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java 
b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java
index 8a769fccb9..75191f4d70 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java
@@ -19,10 +19,12 @@
 package org.apache.iceberg.util;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
@@ -167,6 +169,26 @@ public class TestJsonUtil {
         .hasMessage("Cannot parse to a string value: x: 23");
   }
 
+  @Test
+  public void getByteBufferOrNull() throws JsonProcessingException {
+    Assertions.assertThat(JsonUtil.getByteBufferOrNull("x", 
JsonUtil.mapper().readTree("{}")))
+        .isNull();
+    Assertions.assertThat(
+            JsonUtil.getByteBufferOrNull("x", 
JsonUtil.mapper().readTree("{\"x\": null}")))
+        .isNull();
+
+    byte[] bytes = new byte[] {1, 2, 3, 4};
+    String base16Str = BaseEncoding.base16().encode(bytes);
+    String json = String.format("{\"x\": \"%s\"}", base16Str);
+    ByteBuffer byteBuffer = JsonUtil.getByteBufferOrNull("x", 
JsonUtil.mapper().readTree(json));
+    Assertions.assertThat(byteBuffer.array()).isEqualTo(bytes);
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getByteBufferOrNull("x", 
JsonUtil.mapper().readTree("{\"x\": 23}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse byte buffer from non-text value: x: 23");
+  }
+
   @Test
   public void getBool() throws JsonProcessingException {
     Assertions.assertThatThrownBy(() -> JsonUtil.getBool("x", 
JsonUtil.mapper().readTree("{}")))
@@ -194,6 +216,28 @@ public class TestJsonUtil {
         .isFalse();
   }
 
+  @Test
+  public void getIntArrayOrNull() throws JsonProcessingException {
+    Assertions.assertThat(JsonUtil.getIntArrayOrNull("items", 
JsonUtil.mapper().readTree("{}")))
+        .isNull();
+
+    Assertions.assertThat(
+            JsonUtil.getIntArrayOrNull("items", 
JsonUtil.mapper().readTree("{\"items\": null}")))
+        .isNull();
+
+    Assertions.assertThatThrownBy(
+            () ->
+                JsonUtil.getIntArrayOrNull(
+                    "items", JsonUtil.mapper().readTree("{\"items\": [13, 
\"23\"]}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse integer from non-int value in items: 
\"23\"");
+
+    Assertions.assertThat(
+            JsonUtil.getIntArrayOrNull(
+                "items", JsonUtil.mapper().readTree("{\"items\": [23, 45]}")))
+        .isEqualTo(new int[] {23, 45});
+  }
+
   @Test
   public void getIntegerList() throws JsonProcessingException {
     Assertions.assertThatThrownBy(
@@ -204,7 +248,7 @@ public class TestJsonUtil {
     Assertions.assertThatThrownBy(
             () -> JsonUtil.getIntegerList("items", 
JsonUtil.mapper().readTree("{\"items\": null}")))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot parse from non-array value: items: null");
+        .hasMessage("Cannot parse JSON array from non-array value: items: 
null");
 
     Assertions.assertThatThrownBy(
             () ->
@@ -240,7 +284,7 @@ public class TestJsonUtil {
     Assertions.assertThatThrownBy(
             () -> JsonUtil.getIntegerSet("items", 
JsonUtil.mapper().readTree("{\"items\": null}")))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot parse from non-array value: items: null");
+        .hasMessage("Cannot parse JSON array from non-array value: items: 
null");
 
     Assertions.assertThatThrownBy(
             () ->
@@ -286,7 +330,7 @@ public class TestJsonUtil {
     Assertions.assertThatThrownBy(
             () -> JsonUtil.getLongList("items", 
JsonUtil.mapper().readTree("{\"items\": null}")))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot parse from non-array value: items: null");
+        .hasMessage("Cannot parse JSON array from non-array value: items: 
null");
 
     Assertions.assertThatThrownBy(
             () ->
@@ -312,6 +356,28 @@ public class TestJsonUtil {
         .isEqualTo(items);
   }
 
+  @Test
+  public void getLongListOrNull() throws JsonProcessingException {
+    Assertions.assertThat(JsonUtil.getLongListOrNull("items", 
JsonUtil.mapper().readTree("{}")))
+        .isNull();
+
+    Assertions.assertThat(
+            JsonUtil.getLongListOrNull("items", 
JsonUtil.mapper().readTree("{\"items\": null}")))
+        .isNull();
+
+    Assertions.assertThatThrownBy(
+            () ->
+                JsonUtil.getLongListOrNull(
+                    "items", JsonUtil.mapper().readTree("{\"items\": [13, 
\"23\"]}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse long from non-long value in items: \"23\"");
+
+    Assertions.assertThat(
+            JsonUtil.getLongListOrNull(
+                "items", JsonUtil.mapper().readTree("{\"items\": [23, 45]}")))
+        .containsExactlyElementsOf(Arrays.asList(23L, 45L));
+  }
+
   @Test
   public void getLongSet() throws JsonProcessingException {
     Assertions.assertThatThrownBy(
@@ -322,7 +388,7 @@ public class TestJsonUtil {
     Assertions.assertThatThrownBy(
             () -> JsonUtil.getLongSet("items", 
JsonUtil.mapper().readTree("{\"items\": null}")))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot parse from non-array value: items: null");
+        .hasMessage("Cannot parse JSON array from non-array value: items: 
null");
 
     Assertions.assertThatThrownBy(
             () ->
@@ -367,7 +433,7 @@ public class TestJsonUtil {
     Assertions.assertThatThrownBy(
             () -> JsonUtil.getStringList("items", 
JsonUtil.mapper().readTree("{\"items\": null}")))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot parse from non-array value: items: null");
+        .hasMessage("Cannot parse JSON array from non-array value: items: 
null");
 
     Assertions.assertThatThrownBy(
             () ->
@@ -426,7 +492,7 @@ public class TestJsonUtil {
     Assertions.assertThatThrownBy(
             () -> JsonUtil.getStringSet("items", 
JsonUtil.mapper().readTree("{\"items\": null}")))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot parse from non-array value: items: null");
+        .hasMessage("Cannot parse JSON array from non-array value: items: 
null");
 
     Assertions.assertThatThrownBy(
             () ->
@@ -451,7 +517,7 @@ public class TestJsonUtil {
     Assertions.assertThatThrownBy(
             () -> JsonUtil.getStringMap("items", 
JsonUtil.mapper().readTree("{\"items\": null}")))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot parse from non-object value: items: null");
+        .hasMessage("Cannot parse string map from non-object value: items: 
null");
 
     Assertions.assertThatThrownBy(
             () ->
diff --git a/format/spec.md b/format/spec.md
index 58cfc22911..60c0f99c3f 100644
--- a/format/spec.md
+++ b/format/spec.md
@@ -1128,6 +1128,41 @@ Example
      ] } ]
 ```
 
+### Content File (Data and Delete) Serialization
+
+Content file (data or delete) is serialized as a JSON object according to the 
following table.
+
+| Metadata field           |JSON representation|Example|
+|--------------------------|--- |--- |
+| **`spec-id`**            |`JSON int`|`1`|
+| **`content`**            |`JSON string`|`DATA`, `POSITION_DELETES`, 
`EQUALITY_DELETES`|
+| **`file-path`**          |`JSON string`|`"s3://b/wh/data.db/table"`|
+| **`file-format`**        |`JSON string`|`AVRO`, `ORC`, `PARQUET`|
+| **`partition`**          |`JSON object: Partition data tuple using partition 
field ids for the struct field ids`|`{"1000":1}`|
+| **`record-count`**       |`JSON long`|`1`|
+| **`file-size-in-bytes`** |`JSON long`|`1024`|
+| **`column-sizes`**       |`JSON object: Map from column id to the total size 
on disk of all regions that store the 
column.`|`{"keys":[3,4],"values":[100,200]}`|
+| **`value-counts`**       |`JSON object: Map from column id to number of 
values in the column (including null and NaN 
values)`|`{"keys":[3,4],"values":[90,180]}`|
+| **`null-value-counts`**  |`JSON object: Map from column id to number of null 
values in the column`|`{"keys":[3,4],"values":[10,20]}`|
+| **`nan-value-counts`**   |`JSON object: Map from column id to number of NaN 
values in the column`|`{"keys":[3,4],"values":[0,0]}`|
+| **`lower-bounds`**       |`JSON object: Map from column id to lower bound 
binary in the column serialized as hexadecimal 
string`|`{"keys":[3,4],"values":["01000000","02000000"]}`|
+| **`upper-bounds`**       |`JSON object: Map from column id to upper bound 
binary in the column serialized as hexadecimal 
string`|`{"keys":[3,4],"values":["05000000","0A000000"]}`|
+| **`key-metadata`**       |`JSON string: Encryption key metadata binary 
serialized as hexadecimal string`|`00000000000000000000000000000000`|
+| **`split-offsets`**      |`JSON list of long: Split offsets for the data 
file`|`[128,256]`|
+| **`equality-ids`**       |`JSON list of int: Field ids used to determine row 
equality in equality delete files`|`[1]`|
+| **`sort-order-id`**      |`JSON int`|`1`|
+
+### File Scan Task Serialization
+
+File scan task is serialized as a JSON object according to the following table.
+
+| Metadata field       |JSON representation|Example|
+|--------------------------|--- |--- |
+| **`schema`**          |`JSON object`|`See above, read schemas instead`|
+| **`spec`**            |`JSON object`|`See above, read partition specs 
instead`|
+| **`data-file`**       |`JSON object`|`See above, read content file instead`|
+| **`delete-files`**    |`JSON list of objects`|`See above, read content file 
instead`|
+| **`residual-filter`** |`JSON object: residual filter 
expression`|`{"type":"eq","term":"id","value":1}`|
 
 ## Appendix D: Single-value serialization
 

Reply via email to