openinx commented on a change in pull request #1663: URL: https://github.com/apache/iceberg/pull/1663#discussion_r528278694
########## File path: core/src/main/java/org/apache/iceberg/io/BaseDeltaWriter.java ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import java.util.List; +import java.util.function.Function; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.StructLikeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BaseDeltaWriter<T> implements DeltaWriter<T> { + private static final Logger LOG = LoggerFactory.getLogger(BaseDeltaWriter.class); + + private final RollingContentFileWriter<DataFile, T> dataWriter; + private final RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter; + private final RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter; + + private final PositionDelete<T> positionDelete = new PositionDelete<>(); + private final StructLikeMap<List<FilePos>> insertedRowMap; + + // Function to convert the generic data to a StructLike. + private final Function<T, StructLike> structLikeFun; + + public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter) { + this(dataWriter, null); + } + + public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter, + RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter) { + this(dataWriter, posDeleteWriter, null, null, null, null); + } + + public BaseDeltaWriter(RollingContentFileWriter<DataFile, T> dataWriter, + RollingContentFileWriter<DeleteFile, PositionDelete<T>> posDeleteWriter, + RollingContentFileWriter<DeleteFile, T> equalityDeleteWriter, + Schema tableSchema, + List<Integer> equalityFieldIds, + Function<T, StructLike> structLikeFun) { + + Preconditions.checkNotNull(dataWriter, "Data writer should always not be null."); + + if (posDeleteWriter == null) { + // Only accept INSERT records. + Preconditions.checkArgument(equalityDeleteWriter == null); + } + + if (posDeleteWriter != null && equalityDeleteWriter == null) { + // Only accept INSERT records and position deletion. + Preconditions.checkArgument(tableSchema == null); + Preconditions.checkArgument(equalityFieldIds == null); + } + + if (equalityDeleteWriter != null) { + // Accept insert records, position deletion, equality deletions. + Preconditions.checkNotNull(posDeleteWriter, + "Position delete writer shouldn't be null when writing equality deletions."); + Preconditions.checkNotNull(tableSchema, "Iceberg table schema shouldn't be null"); + Preconditions.checkNotNull(equalityFieldIds, "Equality field ids shouldn't be null"); + Preconditions.checkNotNull(structLikeFun, "StructLike function shouldn't be null"); + + Schema deleteSchema = TypeUtil.select(tableSchema, Sets.newHashSet(equalityFieldIds)); + this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct()); + this.structLikeFun = structLikeFun; + } else { + this.insertedRowMap = null; + this.structLikeFun = null; + } + + this.dataWriter = dataWriter; + this.equalityDeleteWriter = equalityDeleteWriter; + this.posDeleteWriter = posDeleteWriter; + } + + @Override + public void writeRow(T row) throws IOException { + if (enableEqualityDelete()) { + FilePos filePos = FilePos.create(dataWriter.currentPath(), dataWriter.currentPos()); + insertedRowMap.compute(structLikeFun.apply(row), (k, v) -> { + if (v == null) { + return Lists.newArrayList(filePos); + } else { + v.add(filePos); + return v; Review comment: Think about it again, My conclusion is : we should choose the solutions: 1. When a INSERT come in, then: a. Eliminating duplicated INSERT (adding a delete to replace the record as you said) in the same checkpoint for a given TaskWriter; b. Write it into data file, and just ignore the duplication INSERTs between checkpoints; 2. when a DELETE come in, then: a. write the pos-delete file if key exists in the `insertedRowMap` ; b. always write the equality-delete. Firstly, eliminating duplicated INSERT is easy, because we've already had the `insertedRowMap`, if there's a coming duplicated insert key, then we just replace the old INSERT (by writing the pos-delete file) with new INSERT (by writing the insert data file). The `insertedRowMap` don't have to track `List<FilePos>` per key, only need the `FilePos` per key. Second, eliminating the duplicated INSERT between checkpoints is high-cost and we don't have to. because the equality-deletions will mask all duplicated INSERT for the same key in the previous checkpoint. The code should be like that: ```java @Override public void writeRow(T row) { if (allowEqDelete()) { RowOffset rowOffset = RowOffset.create(dataWriter.currentPath(), dataWriter.currentRows()); // Copy the key to avoid messing up the insertedRowMap. StructLike copiedKey = asCopiedKey(row); // Adding a pos-delete to replace the old row. RowOffset previous = insertedRowMap.put(copiedKey, rowOffset); if (previous != null) { posDeleteWriter.write(positionDelete.set(previous.path, previous.rowId, row)); } } dataWriter.write(row); } @Override public void writeEqualityDelete(T equalityDelete) { Preconditions.checkState(allowEqDelete(), "Could not accept equality deletion."); StructLike key = asKey(equalityDelete); RowOffset previous = insertedRowMap.get(key); if (previous != null) { posDeleteWriter.write(positionDelete.set(previous.path, previous.rowId, equalityDelete)); insertedRowMap.remove(key); } eqDeleteWriter.write(equalityDelete); } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
