amogh-jahagirdar commented on code in PR #7029:
URL: https://github.com/apache/iceberg/pull/7029#discussion_r1134254487


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ScanTaskSetManager.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Pair;
+
+public class ScanTaskSetManager {
+
+  private static final ScanTaskSetManager INSTANCE = new ScanTaskSetManager();
+
+  private final Map<Pair<String, String>, List<ScanTask>> tasksMap = 
Maps.newConcurrentMap();
+
+  private ScanTaskSetManager() {}
+
+  public static ScanTaskSetManager get() {
+    return INSTANCE;
+  }
+
+  public void stageTasks(Table table, String setID, List<ScanTask> tasks) {
+    Preconditions.checkArgument(
+        tasks != null && tasks.size() > 0, "Cannot stage null or empty tasks");
+    Pair<String, String> id = toID(table, setID);
+    tasksMap.put(id, tasks);
+  }
+
+  public List<ScanTask> fetchTasks(Table table, String setID) {
+    Pair<String, String> id = toID(table, setID);
+    return tasksMap.get(id);
+  }
+
+  public List<ScanTask> removeTasks(Table table, String setID) {

Review Comment:
   Is `removeTasks` actually needed? 



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ScanTaskSetManager.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Pair;
+
+public class ScanTaskSetManager {
+
+  private static final ScanTaskSetManager INSTANCE = new ScanTaskSetManager();
+
+  private final Map<Pair<String, String>, List<ScanTask>> tasksMap = 
Maps.newConcurrentMap();
+
+  private ScanTaskSetManager() {}
+
+  public static ScanTaskSetManager get() {

Review Comment:
   Should this be `synchronized`?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -761,4 +818,270 @@ public void close() throws IOException {
       delegate.close();
     }
   }
+
+  class PositionDeleteBatchWrite implements BatchWrite {
+
+    private String fileSetID;
+
+    private PositionDeleteBatchWrite(String fileSetID) {
+      this.fileSetID = fileSetID;
+    }
+
+    @Override
+    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
+      // broadcast the table metadata as the writer factory will be sent to 
executors
+      Broadcast<Table> tableBroadcast =
+          sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+      return new PositionDeltaWriteFactory(
+          tableBroadcast, queryId, format, targetFileSize, writeSchema, 
dsSchema);
+    }
+
+    @Override
+    public void commit(WriterCommitMessage[] messages) {
+      PositionDeletesRewriteCoordinator coordinator = 
PositionDeletesRewriteCoordinator.get();
+      coordinator.stageRewrite(table, fileSetID, 
ImmutableSet.copyOf(files(messages)));
+    }
+
+    @Override
+    public void abort(WriterCommitMessage[] messages) {
+      if (cleanupOnAbort) {
+        SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
+      } else {
+        LOG.warn("Skipping cleanup of written files");
+      }
+    }
+
+    private List<DeleteFile> files(WriterCommitMessage[] messages) {
+      List<DeleteFile> files = Lists.newArrayList();
+
+      for (WriterCommitMessage message : messages) {
+        if (message != null) {
+          DeleteTaskCommit taskCommit = (DeleteTaskCommit) message;
+          files.addAll(Arrays.asList(taskCommit.files()));
+        }
+      }
+
+      return files;
+    }
+  }
+
+  static class PositionDeltaWriteFactory implements DataWriterFactory {
+    private final Broadcast<Table> tableBroadcast;
+    private final String queryId;
+    private final FileFormat format;
+    private final Long targetFileSize;
+    private final Schema writeSchema;
+    private final StructType dsSchema;
+
+    PositionDeltaWriteFactory(
+        Broadcast<Table> tableBroadcast,
+        String queryId,
+        FileFormat format,
+        long targetFileSize,
+        Schema writeSchema,
+        StructType dsSchema) {
+      this.tableBroadcast = tableBroadcast;
+      this.queryId = queryId;
+      this.format = format;
+      this.targetFileSize = targetFileSize;
+      this.writeSchema = writeSchema;
+      this.dsSchema = dsSchema;
+    }
+
+    @Override
+    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
+      Table table = tableBroadcast.value();
+
+      OutputFileFactory deleteFileFactory =
+          OutputFileFactory.builderFor(table, partitionId, taskId)
+              .format(format)
+              .operationId(queryId)
+              .suffix("deletes")
+              .build();
+
+      Schema positionDeleteRowSchema =
+          new Schema(
+              writeSchema
+                  .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+                  .type()
+                  .asStructType()
+                  .fields());
+      StructType deleteFileType =
+          new StructType(
+              new StructField[] {
+                dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()),
+                dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()),
+                dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+              });
+
+      SparkFileWriterFactory writerFactoryWithRow =
+          SparkFileWriterFactory.builderFor(table)
+              .dataFileFormat(format)
+              .dataSchema(writeSchema)
+              .dataSparkType(dsSchema)
+              .deleteFileFormat(format)
+              .positionDeleteRowSchema(positionDeleteRowSchema)
+              .positionDeleteSparkType(deleteFileType)
+              .build();
+
+      SparkFileWriterFactory writerFactoryWithoutRow =
+          SparkFileWriterFactory.builderFor(table)
+              .dataFileFormat(format)
+              .dataSchema(writeSchema)
+              .dataSparkType(dsSchema)
+              .deleteFileFormat(format)
+              .positionDeleteSparkType(deleteFileType)
+              .build();
+
+      return new DeleteWriter(
+          table,
+          writerFactoryWithRow,
+          writerFactoryWithoutRow,
+          deleteFileFactory,
+          targetFileSize,
+          dsSchema);
+    }
+  }
+
+  private static class DeleteWriter implements DataWriter<InternalRow> {
+    private final ClusteredPositionDeleteWriter<InternalRow> writerWithRow;
+    private final ClusteredPositionDeleteWriter<InternalRow> writerWithoutRow;
+    private final PositionDelete<InternalRow> positionDelete;
+    private final FileIO io;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper partitionRowWrapper;
+    private final Map<Integer, StructProjection> partitionProjections;
+    private final int specIdOrdinal;
+    private final Option<Integer> partitionOrdinalOption;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+    private final int rowOrdinal;
+    private final int rowSize;
+
+    private boolean closed = false;
+
+    /**
+     * Writer for position deletes metadata table.
+     *
+     * <p>Delete files need to either have 'row' as required field, or omit 
'row' altogether, for
+     * delete file stats accuracy Hence, this is a fanout writer, redirecting 
rows with null 'row'
+     * to one delegate, and non-null 'row' to another

Review Comment:
   I guess it should say 'or omit 'row' altogether to **ensure** delete file 
stats accuracy.'



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java:
##########
@@ -761,4 +818,270 @@ public void close() throws IOException {
       delegate.close();
     }
   }
+
+  class PositionDeleteBatchWrite implements BatchWrite {
+
+    private String fileSetID;
+
+    private PositionDeleteBatchWrite(String fileSetID) {
+      this.fileSetID = fileSetID;
+    }
+
+    @Override
+    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
+      // broadcast the table metadata as the writer factory will be sent to 
executors
+      Broadcast<Table> tableBroadcast =
+          sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+      return new PositionDeltaWriteFactory(
+          tableBroadcast, queryId, format, targetFileSize, writeSchema, 
dsSchema);
+    }
+
+    @Override
+    public void commit(WriterCommitMessage[] messages) {
+      PositionDeletesRewriteCoordinator coordinator = 
PositionDeletesRewriteCoordinator.get();
+      coordinator.stageRewrite(table, fileSetID, 
ImmutableSet.copyOf(files(messages)));
+    }
+
+    @Override
+    public void abort(WriterCommitMessage[] messages) {
+      if (cleanupOnAbort) {
+        SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
+      } else {
+        LOG.warn("Skipping cleanup of written files");
+      }
+    }
+
+    private List<DeleteFile> files(WriterCommitMessage[] messages) {
+      List<DeleteFile> files = Lists.newArrayList();
+
+      for (WriterCommitMessage message : messages) {
+        if (message != null) {
+          DeleteTaskCommit taskCommit = (DeleteTaskCommit) message;
+          files.addAll(Arrays.asList(taskCommit.files()));
+        }
+      }
+
+      return files;
+    }
+  }
+
+  static class PositionDeltaWriteFactory implements DataWriterFactory {
+    private final Broadcast<Table> tableBroadcast;
+    private final String queryId;
+    private final FileFormat format;
+    private final Long targetFileSize;
+    private final Schema writeSchema;
+    private final StructType dsSchema;
+
+    PositionDeltaWriteFactory(
+        Broadcast<Table> tableBroadcast,
+        String queryId,
+        FileFormat format,
+        long targetFileSize,
+        Schema writeSchema,
+        StructType dsSchema) {
+      this.tableBroadcast = tableBroadcast;
+      this.queryId = queryId;
+      this.format = format;
+      this.targetFileSize = targetFileSize;
+      this.writeSchema = writeSchema;
+      this.dsSchema = dsSchema;
+    }
+
+    @Override
+    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
+      Table table = tableBroadcast.value();
+
+      OutputFileFactory deleteFileFactory =
+          OutputFileFactory.builderFor(table, partitionId, taskId)
+              .format(format)
+              .operationId(queryId)
+              .suffix("deletes")
+              .build();
+
+      Schema positionDeleteRowSchema =
+          new Schema(
+              writeSchema
+                  .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+                  .type()
+                  .asStructType()
+                  .fields());
+      StructType deleteFileType =
+          new StructType(
+              new StructField[] {
+                dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()),
+                dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()),
+                dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+              });
+
+      SparkFileWriterFactory writerFactoryWithRow =
+          SparkFileWriterFactory.builderFor(table)
+              .dataFileFormat(format)
+              .dataSchema(writeSchema)
+              .dataSparkType(dsSchema)
+              .deleteFileFormat(format)
+              .positionDeleteRowSchema(positionDeleteRowSchema)
+              .positionDeleteSparkType(deleteFileType)
+              .build();
+
+      SparkFileWriterFactory writerFactoryWithoutRow =
+          SparkFileWriterFactory.builderFor(table)
+              .dataFileFormat(format)
+              .dataSchema(writeSchema)
+              .dataSparkType(dsSchema)
+              .deleteFileFormat(format)
+              .positionDeleteSparkType(deleteFileType)
+              .build();
+
+      return new DeleteWriter(
+          table,
+          writerFactoryWithRow,
+          writerFactoryWithoutRow,
+          deleteFileFactory,
+          targetFileSize,
+          dsSchema);
+    }
+  }
+
+  private static class DeleteWriter implements DataWriter<InternalRow> {
+    private final ClusteredPositionDeleteWriter<InternalRow> writerWithRow;
+    private final ClusteredPositionDeleteWriter<InternalRow> writerWithoutRow;
+    private final PositionDelete<InternalRow> positionDelete;
+    private final FileIO io;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper partitionRowWrapper;
+    private final Map<Integer, StructProjection> partitionProjections;
+    private final int specIdOrdinal;
+    private final Option<Integer> partitionOrdinalOption;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+    private final int rowOrdinal;
+    private final int rowSize;
+
+    private boolean closed = false;
+
+    /**
+     * Writer for position deletes metadata table.
+     *
+     * <p>Delete files need to either have 'row' as required field, or omit 
'row' altogether, for
+     * delete file stats accuracy Hence, this is a fanout writer, redirecting 
rows with null 'row'
+     * to one delegate, and non-null 'row' to another

Review Comment:
   Are there any skewness issues possible with fanning out across non-null and 
null rows? For example one writer becomes responsible for more work than the 
other? I'm also just trying to see if there's a need to distinguish the 
different writers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to