[
https://issues.apache.org/jira/browse/HIVE-26183?focusedWorklogId=765332&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765332
]
ASF GitHub Bot logged work on HIVE-26183:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/May/22 08:54
Start Date: 03/May/22 08:54
Worklog Time Spent: 10m
Work Description: marton-bod commented on code in PR #3251:
URL: https://github.com/apache/hive/pull/3251#discussion_r863586343
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningWriter;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link HiveIcebergBufferedDeleteWriter} needs to handle out of order
records.
+ * We need to keep the incoming records in memory until they are written out.
To keep the memory consumption minimal
+ * we only write out {@link PositionDelete} files where the row data is
omitted, so only the filenames and the rowIds
+ * have to be in the memory.
+ */
+public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveIcebergBufferedDeleteWriter.class);
+
+ private static final String DELETE_FILE_THREAD_POOL_SIZE =
"iceberg.delete.file.thread.pool.size";
+ private static final int DELETE_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
+
+ // Storing deleted data in a map Partition -> FileName -> BitMap
+ private final Map<PartitionKey, Map<String, Roaring64Bitmap>> buffer =
Maps.newHashMap();
+ private final Map<Integer, PartitionSpec> specs;
+ private final Map<PartitionKey, PartitionSpec> keyToSpec = Maps.newHashMap();
+ private final FileFormat format;
+ private final FileWriterFactory<Record> writerFactory;
+ private final OutputFileFactory fileFactory;
+ private final FileIO io;
+ private final long targetFileSize;
+ private final Configuration configuration;
+ private final Record record;
+ private final InternalRecordWrapper wrapper;
+ private FilesForCommit filesForCommit;
+
+ HiveIcebergBufferedDeleteWriter(Schema schema, Map<Integer, PartitionSpec>
specs, FileFormat format,
+ FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory,
FileIO io, long targetFileSize,
+ Configuration configuration) {
+ this.specs = specs;
+ this.format = format;
+ this.writerFactory = writerFactory;
+ this.fileFactory = fileFactory;
+ this.io = io;
+ this.targetFileSize = targetFileSize;
+ this.configuration = configuration;
+ this.wrapper = new InternalRecordWrapper(schema.asStruct());
+ this.record = GenericRecord.create(schema);
+ }
+
+ @Override
+ public void write(Writable row) throws IOException {
+ Record rec = ((Container<Record>) row).get();
+ IcebergAcidUtil.getOriginalFromUpdatedRecord(rec, record);
+ String filePath = (String) rec.getField(MetadataColumns.FILE_PATH.name());
+ int specId = IcebergAcidUtil.parseSpecId(rec);
+
+ Map<String, Roaring64Bitmap> deleteMap =
+ buffer.computeIfAbsent(partition(record, specId), key -> {
+ keyToSpec.put(key, specs.get(specId));
+ return Maps.newHashMap();
+ });
+ Roaring64Bitmap deletes = deleteMap.computeIfAbsent(filePath, unused ->
new Roaring64Bitmap());
+ deletes.add((Long) rec.getField(MetadataColumns.ROW_POSITION.name()));
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ long startTime = System.currentTimeMillis();
+ Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
+ if (!abort) {
+ LOG.info("Delete file flush is started");
+ ExecutorService fileExecutor = fileExecutor(configuration,
buffer.size());
+ try {
+ Tasks.foreach(buffer.keySet())
+ .retry(3)
+ .executeWith(fileExecutor)
+ .onFailure((partition, exception) -> LOG.info("Failed to write
delete file {}", partition, exception))
+ .run(partition -> {
+ PositionDelete<Record> positionDelete = PositionDelete.create();
+ PartitioningWriter writerForFiles;
+ try (PartitioningWriter writer =
+ new ClusteredPositionDeleteWriter<>(writerFactory,
fileFactory, io, format, targetFileSize)) {
+ Map<String, Roaring64Bitmap> partitionData =
buffer.get(partition);
+ for (String filePath : new TreeSet<>(partitionData.keySet())) {
Review Comment:
I see. My thinking was that this takes `new
TreeSet<>(partitionData.keySet())` n*logn to sort the data, so overall the logn
insertion for the TreeMap will still be cheaper.
Issue Time Tracking
-------------------
Worklog Id: (was: 765332)
Time Spent: 3h 50m (was: 3h 40m)
> Create delete writer for the UPDATE statemens
> ---------------------------------------------
>
> Key: HIVE-26183
> URL: https://issues.apache.org/jira/browse/HIVE-26183
> Project: Hive
> Issue Type: Sub-task
> Reporter: Peter Vary
> Priority: Major
> Labels: pull-request-available
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> During the investigation of the updates of partitioned table we had the
> following issue:
> - Iceberg inserts are needed to be sorted by the new partition keys
> - Iceberg deletes are needed to be sorted by the old partition keys and
> filenames
> This could contradict each other. OTOH Hive updates create a single query and
> writes out the insert/delete record for ever row. This would mean plenty of
> open writers.
> We might want to create something like a
> https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java,
> but we do not want to keep the whole rows in memory.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)