kbendick commented on a change in pull request #3323:
URL: https://github.com/apache/iceberg/pull/3323#discussion_r735250691



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/CommitResult.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.Serializable;
+import java.util.Collection;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.io.WriteResult;
+
+class CommitResult implements Serializable {
+
+  private final long snapshotId;
+  private final long sequenceNumber;
+  private final int specId;
+  private final StructLike partition;
+  private final WriteResult writeResult;
+
+  private CommitResult(long snapshotId,
+                       long sequenceNumber,
+                       int specId,
+                       StructLike partition,
+                       WriteResult writeResult) {
+    this.snapshotId = snapshotId;
+    this.sequenceNumber = sequenceNumber;
+    this.specId = specId;
+    this.partition = partition;
+    this.writeResult = writeResult;
+  }
+
+  long snapshotId() {
+    return snapshotId;
+  }
+
+  long sequenceNumber() {
+    return sequenceNumber;
+  }
+
+  public int specId() {
+    return specId;
+  }
+
+  StructLike partition() {
+    return partition;
+  }
+
+  WriteResult writeResult() {
+    return writeResult;
+  }
+
+  static Builder builder(long snapshotId, long sequenceNumber) {
+    return new Builder(snapshotId, sequenceNumber);
+  }
+
+  static class Builder {
+
+    private final long snapshotId;
+    private final long sequenceNumber;
+    private int specId;
+    private StructLike partition;
+    private final WriteResult.Builder writeResult;
+
+    private Builder(long snapshotId, long sequenceNumber) {
+      this.snapshotId = snapshotId;
+      this.sequenceNumber = sequenceNumber;
+      this.specId = 0;
+      this.partition = null;
+      this.writeResult = WriteResult.builder();
+    }
+
+    Builder partition(int newSpecId, StructLike newPartition) {
+      this.specId = newSpecId;
+      this.partition = newPartition;
+      return this;
+    }
+
+    Builder addDataFile(Collection<DataFile> files) {
+      writeResult.addDataFiles(files);
+      return this;
+    }
+
+    Builder addDeleteFile(Collection<DeleteFile> files) {
+      writeResult.addDeleteFiles(files);
+      return this;
+    }
+
+    Builder addReferencedDataFile(Collection<CharSequence> files) {
+      writeResult.addReferencedDataFiles(files);

Review comment:
       Clarifying question: These are data files for which rows in the delete 
dataset are found?

##########
File path: core/src/main/java/org/apache/iceberg/StaticDataTableScan.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StaticDataTableScan extends DataTableScan {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StaticDataTableScan.class);
+
+  private static final long DUMMY_SNAPSHOT_ID = 0L;
+
+  private final List<ManifestFile> dataManifests;
+  private final List<ManifestFile> deleteManifests;
+
+  private StaticDataTableScan(Table table,
+                              TableOperations ops) {
+    super(ops, table);
+    this.dataManifests = Lists.newArrayList();
+    this.deleteManifests = Lists.newArrayList();
+  }
+
+  @Override
+  public TableScan useSnapshot(long scanSnapshotId) {
+    throw new UnsupportedOperationException(String.format(
+        "Scan table using scan snapshot id %s is not supported", 
scanSnapshotId));
+  }
+
+  @Override
+  public TableScan asOfTime(long timestampMillis) {
+    throw new UnsupportedOperationException(String.format(
+        "Scan table as of time %s is not supported", timestampMillis));
+  }
+
+  @Override
+  public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
+    throw new UnsupportedOperationException("Incremental scan is not 
supported");
+  }
+
+  @Override
+  public TableScan appendsAfter(long fromSnapshotId) {
+    throw new UnsupportedOperationException("Incremental scan is not 
supported");
+  }
+
+  @Override
+  public Snapshot snapshot() {
+    return null;
+  }
+
+  public TableScan scan(Iterable<ManifestFile> newManifests) {
+    for (ManifestFile manifestFile : newManifests) {
+      Preconditions.checkArgument(manifestFile != null, "Cannot scan a null 
manifest file");
+      switch (manifestFile.content()) {
+        case DATA:
+          dataManifests.add(manifestFile);
+          break;
+        case DELETES:
+          deleteManifests.add(manifestFile);
+          break;
+        default: throw new IllegalArgumentException("Unknown ManifestContent 
type: " + manifestFile.content());
+      }
+    }
+    return this;
+  }
+
+  @Override
+  public CloseableIterable<FileScanTask> planFiles() {
+    LOG.info("Scanning table {} partially with filter {}", table(), 
context().rowFilter());
+
+    Listeners.notifyAll(new ScanEvent(table().name(), DUMMY_SNAPSHOT_ID, 
context().rowFilter(), schema()));

Review comment:
       Is the `dummySnapshotId` normal for WAP events? I'm admittedly not as 
familiar with them. Will this break the WAP ability?

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/DataFileGroup.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class DataFileGroup implements KryoSerializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileGroup.class);
+
+  private long latestSequenceNumber;
+  private long lastSnapshotId;
+  private int filesCount;
+  private long filesSize;
+  private transient List<DeltaManifests> manifestsList;
+
+  DataFileGroup() {
+    this.latestSequenceNumber = 0;
+    this.lastSnapshotId = 0;
+    this.filesCount = 0;
+    this.filesSize = 0;
+    this.manifestsList = Lists.newArrayList();
+  }
+
+  long latestSequenceNumber() {
+    return latestSequenceNumber;
+  }
+
+  long latestSnapshotId() {
+    return lastSnapshotId;
+  }
+
+  int filesCount() {
+    return filesCount;
+  }
+
+  long filesSize() {
+    return filesSize;
+  }
+
+  List<DeltaManifests> manifestsList() {
+    return manifestsList;
+  }
+
+  Iterable<ManifestFile> manifestFiles() {
+    return Iterables.concat(Lists.transform(manifestsList, 
DeltaManifests::manifests));
+  }
+
+  void append(int dataFilesCount, long dataFliesSize, long snapshotId, long 
sequenceNumber,
+              DeltaManifests deltaManifests) throws IOException {
+    if (deltaManifests == null || deltaManifests.manifests().isEmpty()) {
+      return;
+    }
+
+    if (sequenceNumber > latestSequenceNumber) {
+      latestSequenceNumber = sequenceNumber;
+      lastSnapshotId = snapshotId;
+    }
+
+    filesCount += dataFilesCount;
+    filesSize += dataFliesSize;
+    manifestsList.add(deltaManifests);
+  }
+
+  @Override
+  public void write(Kryo kryo, Output output) {
+    try {
+      output.writeLong(latestSequenceNumber);
+      output.writeLong(lastSnapshotId);
+      output.writeInt(filesCount);
+      output.writeLong(filesSize);
+
+      int size = manifestsList.size();
+      output.writeInt(size);
+      for (DeltaManifests manifests : manifestsList) {
+        byte[] data = SimpleVersionedSerialization.writeVersionAndSerialize(
+            DeltaManifestsSerializer.INSTANCE, manifests);
+        output.writeInt(data.length);
+        output.writeBytes(data);
+      }
+    } catch (IOException e) {
+      LOG.error("Cannot serialize data file group by kryo.", e);

Review comment:
       If this is hit (which I know shouldn't be expected), will the user 
possibly lose data?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to