gaoyunhaii commented on a change in pull request #18680: URL: https://github.com/apache/flink/pull/18680#discussion_r803265948
########## File path: flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedCompactingFileWriter.java ########## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * The compactors use the {@link OutputStreamBasedCompactingFileWriter} to directly write a + * compacting file as an {@link OutputStream}. + */ +public interface OutputStreamBasedCompactingFileWriter extends CompactingFileWriter { + /** + * Get the output stream underlying the writer. The close method of the returned stream should Review comment: `Get` -> `Gets` ########## File path: flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java ########## @@ -39,6 +39,32 @@ InProgressFileWriter<IN, BucketID> openNewInProgressFile( final BucketID bucketID, final Path path, final long creationTime) throws IOException; + /** + * Used to create a new {@link CompactingFileWriter} of the requesting type. A {@link + * InProgressFileWriter} will be created by default, which supports only the RECORD_WISE type. + * Requesting a writer of an unsupported type will result in an UnsupportedOperationException. Review comment: `an UnsupportedOperationException` -> `UnsupportedOperationException`? Also I think we do not create the `InProgressFileWriter` by default, might change to `A {@link * InProgressFileWriter} will be created by default` for RECORD_WISE type`. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.compactor.operator; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Coordinator that coordinates file compaction for the {@link FileSink}. + * + * <p>All committable emitted from the writers are collected and packed into {@link + * CompactorRequest}s. The {@link FileCompactStrategy} defines when the requests can be fired. When + * a firing condition is met, the requests will be sent to the {@link CompactorOperator}. + * + * <p>The {@link CompactCoordinator} stores the non-fired committable as its state, and may emit a + * request at any time. A {@link CompactorOperator} must ensure that the ownership of the + * committable in a compact request is successfully handed from the coordinator, before it can + * actually perform the compaction. + */ +public class CompactCoordinator extends AbstractStreamOperator<CompactorRequest> + implements OneInputStreamOperator< + CommittableMessage<FileSinkCommittable>, CompactorRequest>, + BoundedOneInput { + + private static final ListStateDescriptor<byte[]> REMAINING_COMMITTABLE_RAW_STATES_DESC = + new ListStateDescriptor<>( + "remaining_compact_commt_raw_state", BytePrimitiveArraySerializer.INSTANCE); + + private final FileCompactStrategy strategy; + private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer; + + private final Map<String, CompactorRequest> packing = new HashMap<>(); + private final Map<String, CompactTrigger> triggers = new HashMap<>(); + + private ListState<FileSinkCommittable> remainingCommittableState; + + public CompactCoordinator( + FileCompactStrategy strategy, + SimpleVersionedSerializer<FileSinkCommittable> committableSerializer) { + this.strategy = strategy; + this.committableSerializer = checkNotNull(committableSerializer); + } + + @Override + public void processElement(StreamRecord<CommittableMessage<FileSinkCommittable>> element) + throws Exception { + CommittableMessage<FileSinkCommittable> message = element.getValue(); + if (message instanceof CommittableWithLineage) { + FileSinkCommittable committable = + ((CommittableWithLineage<FileSinkCommittable>) element.getValue()) + .getCommittable(); + String bucketId = committable.getBucketId(); + if (packAndTrigger(bucketId, committable)) { + fireAndPurge(bucketId); + } + } + // or message instanceof CommittableSummary + // info in CommittableSummary is not necessary for compacting at present, ignore it + } + + private boolean packAndTrigger(String bucketId, FileSinkCommittable committable) { + CompactorRequest bucketRequest = packing.computeIfAbsent(bucketId, CompactorRequest::new); + if (committable.hasInProgressFileToCleanup() || committable.hasCompactedFileToCleanup()) { Review comment: Perhaps first assert here that such committables must not has pendingFiles, checkState(!committable.hasPendingFIle()). ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.compactor.operator; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Coordinator that coordinates file compaction for the {@link FileSink}. + * + * <p>All committable emitted from the writers are collected and packed into {@link + * CompactorRequest}s. The {@link FileCompactStrategy} defines when the requests can be fired. When + * a firing condition is met, the requests will be sent to the {@link CompactorOperator}. + * + * <p>The {@link CompactCoordinator} stores the non-fired committable as its state, and may emit a + * request at any time. A {@link CompactorOperator} must ensure that the ownership of the + * committable in a compact request is successfully handed from the coordinator, before it can + * actually perform the compaction. + */ +public class CompactCoordinator extends AbstractStreamOperator<CompactorRequest> + implements OneInputStreamOperator< + CommittableMessage<FileSinkCommittable>, CompactorRequest>, + BoundedOneInput { + + private static final ListStateDescriptor<byte[]> REMAINING_COMMITTABLE_RAW_STATES_DESC = + new ListStateDescriptor<>( + "remaining_compact_commt_raw_state", BytePrimitiveArraySerializer.INSTANCE); + + private final FileCompactStrategy strategy; + private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer; + + private final Map<String, CompactorRequest> packing = new HashMap<>(); + private final Map<String, CompactTrigger> triggers = new HashMap<>(); + + private ListState<FileSinkCommittable> remainingCommittableState; + + public CompactCoordinator( + FileCompactStrategy strategy, + SimpleVersionedSerializer<FileSinkCommittable> committableSerializer) { + this.strategy = strategy; + this.committableSerializer = checkNotNull(committableSerializer); + } + + @Override + public void processElement(StreamRecord<CommittableMessage<FileSinkCommittable>> element) + throws Exception { + CommittableMessage<FileSinkCommittable> message = element.getValue(); + if (message instanceof CommittableWithLineage) { + FileSinkCommittable committable = + ((CommittableWithLineage<FileSinkCommittable>) element.getValue()) + .getCommittable(); + String bucketId = committable.getBucketId(); + if (packAndTrigger(bucketId, committable)) { + fireAndPurge(bucketId); + } + } + // or message instanceof CommittableSummary + // info in CommittableSummary is not necessary for compacting at present, ignore it + } + + private boolean packAndTrigger(String bucketId, FileSinkCommittable committable) { + CompactorRequest bucketRequest = packing.computeIfAbsent(bucketId, CompactorRequest::new); + if (committable.hasInProgressFileToCleanup() || committable.hasCompactedFileToCleanup()) { + // cleanup request, pass through directly + bucketRequest.addToPassthrough(committable); + return false; + } + + if (!committable.hasPendingFile()) { + throw new RuntimeException("Committable to compact has no content."); + } + + CompactTrigger trigger = + triggers.computeIfAbsent(bucketId, id -> new CompactTrigger(strategy)); + CompactTriggerResult triggerResult = trigger.onElement(committable); + switch (triggerResult) { + case PASS_THROUGH: + bucketRequest.addToPassthrough(committable); + return false; + case CONTINUE: + bucketRequest.addToCompact(committable); + return false; + case FIRE_AND_PURGE: + bucketRequest.addToCompact(committable); + return true; + default: + throw new RuntimeException("Unexpected trigger result:" + triggerResult); + } + } + + private void fireAndPurge(String bucketId) { + triggers.remove(bucketId); + CompactorRequest request = packing.remove(bucketId); + if (request != null) { + output.collect(new StreamRecord<>(request)); + } + } + + @Override + public void endInput() throws Exception { + // emit all requests remained + for (CompactorRequest request : packing.values()) { + output.collect(new StreamRecord<>(request)); + } + packing.clear(); + triggers.clear(); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + super.prepareSnapshotPreBarrier(checkpointId); + + // trigger on checkpoint + List<String> bucketsToFire = new ArrayList<>(triggers.size()); + for (Map.Entry<String, CompactTrigger> e : triggers.entrySet()) { + String bucketId = e.getKey(); + CompactTrigger trigger = e.getValue(); + if (trigger.onCheckpoint(checkpointId) == CompactTriggerResult.FIRE_AND_PURGE) { + bucketsToFire.add(bucketId); + } + } + bucketsToFire.forEach(this::fireAndPurge); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + + List<FileSinkCommittable> remainingCommittable = + packing.values().stream() + .flatMap(r -> r.getCommittableToCompact().stream()) + .collect(Collectors.toList()); + packing.values().stream() + .flatMap(r -> r.getCommittableToPassthrough().stream()) + .forEach(remainingCommittable::add); + remainingCommittableState.update(remainingCommittable); + + // triggers will be recomputed when restoring so it's not necessary to store + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + remainingCommittableState = + new SimpleVersionedListState<>( + context.getOperatorStateStore() + .getListState(REMAINING_COMMITTABLE_RAW_STATES_DESC), + committableSerializer); + + Iterable<FileSinkCommittable> stateRemaining = remainingCommittableState.get(); + if (stateRemaining != null) { + for (FileSinkCommittable committable : stateRemaining) { + // restore and redistribute + String bucketId = committable.getBucketId(); + if (packAndTrigger(bucketId, committable)) { + fireAndPurge(bucketId); + } + } + } + } + + enum CompactTriggerResult { + CONTINUE, + FIRE_AND_PURGE, + PASS_THROUGH + } + + private static class CompactTrigger { + private final long threshold; + private final boolean compactOnCheckpoint; + + private long size; + + CompactTrigger(FileCompactStrategy strategy) { + this.threshold = strategy.getSizeThreshold(); + this.compactOnCheckpoint = strategy.isCompactOnCheckpoint(); + } + + public CompactTriggerResult onElement(FileSinkCommittable committable) { Review comment: Has we considered the case that when starting from an existing file sink some committable might not target at a temporary file? namely path does not start with '.' ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.compactor.operator; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.compactor.FileCompactor; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.util.Hardware; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * An operator that perform compaction for the {@link FileSink}. + * + * <p>Requests received from the {@link CompactCoordinator} will firstly be held in memory, and + * snapshot into the state of a checkpoint. When the checkpoint is successfully completed, all + * requests received before can be submitted. The results can be emitted at the next {@link + * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to ensure that committers + * can receive only one CommittableSummary and the corresponding number of Committable for a single + * checkpoint. + */ +public class CompactorOperator + extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>> + implements OneInputStreamOperator< + CompactorRequest, CommittableMessage<FileSinkCommittable>>, + BoundedOneInput, + CheckpointListener { + + private static final String COMPACTED_PREFIX = "compacted-"; + private static final long SUBMITTED_ID = -1L; + + private static final ListStateDescriptor<byte[]> REMAINING_REQUESTS_RAW_STATES_DESC = + new ListStateDescriptor<>( + "remaining_requests_raw_state", BytePrimitiveArraySerializer.INSTANCE); + + private final int compactThreads; + private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer; + + private final FileCompactor fileCompactor; + private final BucketWriter<?, String> bucketWriter; + + private transient ExecutorService compactService; + + private List<CompactorRequest> collectingRequests = new ArrayList<>(); + private final TreeMap<Long, List<CompactorRequest>> snapshotRequests = new TreeMap<>(); + private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> + compactingRequests = new LinkedList<>(); + + private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState; + + public CompactorOperator( + int compactThreads, + SimpleVersionedSerializer<FileSinkCommittable> committableSerializer, + FileCompactor fileCompactor, + BucketWriter<?, String> bucketWriter) { + this.compactThreads = compactThreads; + this.committableSerializer = committableSerializer; + this.fileCompactor = fileCompactor; + this.bucketWriter = bucketWriter; + } + + @Override + public void open() throws Exception { + super.open(); + this.compactService = + Executors.newFixedThreadPool( + Math.max(1, Math.min(compactThreads, Hardware.getNumberCPUCores())), + new ExecutorThreadFactory("compact-executor")); + } + + @Override + public void processElement(StreamRecord<CompactorRequest> element) throws Exception { + collectingRequests.add(element.getValue()); + } + + @Override + public void endInput() throws Exception { + // add collecting requests into the final snapshot + snapshotRequests.put(Long.MAX_VALUE, collectingRequests); + collectingRequests = new ArrayList<>(); + + // submit all requests and wait until they are done + submitUntil(Long.MAX_VALUE); + assert snapshotRequests.isEmpty(); + + getAllTasksFuture().join(); + emitCompacted(null); + assert compactingRequests.isEmpty(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + super.notifyCheckpointComplete(checkpointId); + submitUntil(checkpointId); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + emitCompacted(checkpointId); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + + // add collecting requests during the checkpoint into the snapshot + snapshotRequests.put(context.getCheckpointId(), collectingRequests); + collectingRequests = new ArrayList<>(); + + // snapshot all compacting requests as well, including the requests that are not finished + // when invoking prepareSnapshotPreBarrier but finished now, since they are not emitted yet + Map<Long, List<CompactorRequest>> requests = new HashMap<>(snapshotRequests); + requests.computeIfAbsent(SUBMITTED_ID, id -> new ArrayList<>()) + .addAll(compactingRequests.stream().map(r -> r.f0).collect(Collectors.toList())); + remainingRequestsState.update(Collections.singletonList(requests)); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + remainingRequestsState = + new SimpleVersionedListState<>( + context.getOperatorStateStore() + .getListState(REMAINING_REQUESTS_RAW_STATES_DESC), + new RemainingRequestsSerializer( + new CompactorRequestSerializer(committableSerializer))); + + Iterable<Map<Long, List<CompactorRequest>>> stateRemaining = remainingRequestsState.get(); + if (stateRemaining != null) { + for (Map<Long, List<CompactorRequest>> requests : stateRemaining) { + // elements can be more than one when redistributed after parallelism changing + for (Map.Entry<Long, List<CompactorRequest>> e : requests.entrySet()) { + List<CompactorRequest> list = + snapshotRequests.computeIfAbsent(e.getKey(), id -> new ArrayList<>()); + list.addAll(e.getValue()); + } + } + } + // submit all requests that is already submitted before this checkpoint + submitUntil(SUBMITTED_ID); + } + + private void submitUntil(long checkpointId) { + NavigableMap<Long, List<CompactorRequest>> canSubmit = + snapshotRequests.subMap(Long.MIN_VALUE, true, checkpointId, true); + Iterator<Entry<Long, List<CompactorRequest>>> iter = canSubmit.entrySet().iterator(); Review comment: Also, in this case we might not need to use the iterator directly: ``` for (Map.Entry<Long, List<CompactorRequest>> entry : canSubmit.entrySet()) { } canSubmit.clear(); ``` ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java ########## @@ -177,6 +189,45 @@ private FileSink(BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuil basePath, bulkWriterFactory, new DateTimeBucketAssigner<>()); } + public BucketWriter<IN, String> createBucketWriter() throws IOException { + return bucketsBuilder.createBucketWriter(); + } + + public FileCompactor getFileCompactor() { + return bucketsBuilder.getFileCompactor(); + } + + @Override + public DataStream<CommittableMessage<FileSinkCommittable>> addPreCommitTopology( + DataStream<CommittableMessage<FileSinkCommittable>> committableStream) { + FileCompactStrategy strategy = bucketsBuilder.getCompactStrategy(); + if (strategy == null) { + // not enabled + return committableStream; + } + + SingleOutputStreamOperator<CompactorRequest> coordinatorOp = + committableStream + .rescale() + .transform( Review comment: We would need to set the uid in someway for these operators since they have state. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java ########## @@ -177,6 +189,45 @@ private FileSink(BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuil basePath, bulkWriterFactory, new DateTimeBucketAssigner<>()); } + public BucketWriter<IN, String> createBucketWriter() throws IOException { + return bucketsBuilder.createBucketWriter(); + } + + public FileCompactor getFileCompactor() { + return bucketsBuilder.getFileCompactor(); + } + + @Override + public DataStream<CommittableMessage<FileSinkCommittable>> addPreCommitTopology( + DataStream<CommittableMessage<FileSinkCommittable>> committableStream) { + FileCompactStrategy strategy = bucketsBuilder.getCompactStrategy(); + if (strategy == null) { + // not enabled + return committableStream; + } + + SingleOutputStreamOperator<CompactorRequest> coordinatorOp = + committableStream + .rescale() Review comment: The default `rebalance` would be the same with `rescale` here? ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java ########## @@ -275,6 +339,18 @@ public T withOutputFileConfig(final OutputFileConfig outputFileConfig) { return self(); } + public T enableCompact(final FileCompactStrategy strategy, final FileCompactor compactor) { + this.compactStrategy = strategy; Review comment: checkNotNull, also for the following fileCompactor. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.compactor.operator; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.compactor.FileCompactor; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.util.Hardware; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * An operator that perform compaction for the {@link FileSink}. + * + * <p>Requests received from the {@link CompactCoordinator} will firstly be held in memory, and + * snapshot into the state of a checkpoint. When the checkpoint is successfully completed, all + * requests received before can be submitted. The results can be emitted at the next {@link + * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to ensure that committers + * can receive only one CommittableSummary and the corresponding number of Committable for a single + * checkpoint. + */ +public class CompactorOperator + extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>> + implements OneInputStreamOperator< + CompactorRequest, CommittableMessage<FileSinkCommittable>>, + BoundedOneInput, + CheckpointListener { + + private static final String COMPACTED_PREFIX = "compacted-"; + private static final long SUBMITTED_ID = -1L; + + private static final ListStateDescriptor<byte[]> REMAINING_REQUESTS_RAW_STATES_DESC = + new ListStateDescriptor<>( + "remaining_requests_raw_state", BytePrimitiveArraySerializer.INSTANCE); + + private final int compactThreads; + private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer; + + private final FileCompactor fileCompactor; + private final BucketWriter<?, String> bucketWriter; + + private transient ExecutorService compactService; + + private List<CompactorRequest> collectingRequests = new ArrayList<>(); + private final TreeMap<Long, List<CompactorRequest>> snapshotRequests = new TreeMap<>(); + private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> + compactingRequests = new LinkedList<>(); + + private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState; + + public CompactorOperator( + int compactThreads, + SimpleVersionedSerializer<FileSinkCommittable> committableSerializer, + FileCompactor fileCompactor, + BucketWriter<?, String> bucketWriter) { + this.compactThreads = compactThreads; + this.committableSerializer = committableSerializer; + this.fileCompactor = fileCompactor; + this.bucketWriter = bucketWriter; + } + + @Override + public void open() throws Exception { + super.open(); + this.compactService = Review comment: This service is not shut down? We might need to try to close the service on `close()` and `finalize()`. See `SystemProcessingTimeService`. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java ########## @@ -177,6 +189,45 @@ private FileSink(BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuil basePath, bulkWriterFactory, new DateTimeBucketAssigner<>()); } + public BucketWriter<IN, String> createBucketWriter() throws IOException { + return bucketsBuilder.createBucketWriter(); + } + + public FileCompactor getFileCompactor() { + return bucketsBuilder.getFileCompactor(); + } + + @Override + public DataStream<CommittableMessage<FileSinkCommittable>> addPreCommitTopology( + DataStream<CommittableMessage<FileSinkCommittable>> committableStream) { + FileCompactStrategy strategy = bucketsBuilder.getCompactStrategy(); + if (strategy == null) { + // not enabled + return committableStream; + } + + SingleOutputStreamOperator<CompactorRequest> coordinatorOp = + committableStream + .rescale() + .transform( + "CompactorCoordinator", + new CompactorRequestTypeInfo( + bucketsBuilder::getCommittableSerializer), + new CompactCoordinatorFactory(this, strategy)) + .setParallelism(1) + .setMaxParallelism(1); + + TypeInformation<CommittableMessage<FileSinkCommittable>> committableType = + committableStream.getType(); + return coordinatorOp + .shuffle() Review comment: Do we have special reason to use shuffle? `shuffle` might incurs creating random numbers, which might cause additional cpu usage. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.compactor.operator; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Coordinator that coordinates file compaction for the {@link FileSink}. + * + * <p>All committable emitted from the writers are collected and packed into {@link + * CompactorRequest}s. The {@link FileCompactStrategy} defines when the requests can be fired. When + * a firing condition is met, the requests will be sent to the {@link CompactorOperator}. + * + * <p>The {@link CompactCoordinator} stores the non-fired committable as its state, and may emit a + * request at any time. A {@link CompactorOperator} must ensure that the ownership of the + * committable in a compact request is successfully handed from the coordinator, before it can + * actually perform the compaction. + */ +public class CompactCoordinator extends AbstractStreamOperator<CompactorRequest> + implements OneInputStreamOperator< + CommittableMessage<FileSinkCommittable>, CompactorRequest>, + BoundedOneInput { + + private static final ListStateDescriptor<byte[]> REMAINING_COMMITTABLE_RAW_STATES_DESC = + new ListStateDescriptor<>( + "remaining_compact_commt_raw_state", BytePrimitiveArraySerializer.INSTANCE); + + private final FileCompactStrategy strategy; + private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer; + + private final Map<String, CompactorRequest> packing = new HashMap<>(); Review comment: `packing` sounds like a verb? Might rename to compactRequests? ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java ########## @@ -275,6 +339,18 @@ public T withOutputFileConfig(final OutputFileConfig outputFileConfig) { return self(); } + public T enableCompact(final FileCompactStrategy strategy, final FileCompactor compactor) { + this.compactStrategy = strategy; + // we always commit before compacting, so hide the file written by writer + this.outputFileConfig = Review comment: We might not directly modify the outputFileConfig here since users could still override the config after enableCompact. We might move the change to the prefix to when we create writers. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java ########## @@ -177,6 +189,45 @@ private FileSink(BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuil basePath, bulkWriterFactory, new DateTimeBucketAssigner<>()); } + public BucketWriter<IN, String> createBucketWriter() throws IOException { Review comment: I tend to not expose new methods in `FileSink`. Is it possible to pass `SerializableSupplier<FileCompactor>` and so on for the fields required to create the `CompactorOperatorFactory` ? Or directly pass a `SerializableSupplier<CompactorOperator>` ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.compactor.operator; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Coordinator that coordinates file compaction for the {@link FileSink}. + * + * <p>All committable emitted from the writers are collected and packed into {@link + * CompactorRequest}s. The {@link FileCompactStrategy} defines when the requests can be fired. When + * a firing condition is met, the requests will be sent to the {@link CompactorOperator}. + * + * <p>The {@link CompactCoordinator} stores the non-fired committable as its state, and may emit a + * request at any time. A {@link CompactorOperator} must ensure that the ownership of the + * committable in a compact request is successfully handed from the coordinator, before it can + * actually perform the compaction. + */ +public class CompactCoordinator extends AbstractStreamOperator<CompactorRequest> + implements OneInputStreamOperator< + CommittableMessage<FileSinkCommittable>, CompactorRequest>, + BoundedOneInput { + + private static final ListStateDescriptor<byte[]> REMAINING_COMMITTABLE_RAW_STATES_DESC = + new ListStateDescriptor<>( + "remaining_compact_commt_raw_state", BytePrimitiveArraySerializer.INSTANCE); + + private final FileCompactStrategy strategy; + private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer; + + private final Map<String, CompactorRequest> packing = new HashMap<>(); + private final Map<String, CompactTrigger> triggers = new HashMap<>(); + + private ListState<FileSinkCommittable> remainingCommittableState; + + public CompactCoordinator( + FileCompactStrategy strategy, + SimpleVersionedSerializer<FileSinkCommittable> committableSerializer) { + this.strategy = strategy; + this.committableSerializer = checkNotNull(committableSerializer); + } + + @Override + public void processElement(StreamRecord<CommittableMessage<FileSinkCommittable>> element) + throws Exception { + CommittableMessage<FileSinkCommittable> message = element.getValue(); + if (message instanceof CommittableWithLineage) { + FileSinkCommittable committable = + ((CommittableWithLineage<FileSinkCommittable>) element.getValue()) + .getCommittable(); + String bucketId = committable.getBucketId(); + if (packAndTrigger(bucketId, committable)) { + fireAndPurge(bucketId); + } + } + // or message instanceof CommittableSummary + // info in CommittableSummary is not necessary for compacting at present, ignore it + } + + private boolean packAndTrigger(String bucketId, FileSinkCommittable committable) { Review comment: nit: I think it might not need to pass `bukcetId` separately, it is a part of `committable` ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.compactor.operator; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.compactor.FileCompactor; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.util.Hardware; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * An operator that perform compaction for the {@link FileSink}. + * + * <p>Requests received from the {@link CompactCoordinator} will firstly be held in memory, and + * snapshot into the state of a checkpoint. When the checkpoint is successfully completed, all + * requests received before can be submitted. The results can be emitted at the next {@link + * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to ensure that committers + * can receive only one CommittableSummary and the corresponding number of Committable for a single + * checkpoint. + */ +public class CompactorOperator + extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>> + implements OneInputStreamOperator< + CompactorRequest, CommittableMessage<FileSinkCommittable>>, + BoundedOneInput, + CheckpointListener { + + private static final String COMPACTED_PREFIX = "compacted-"; + private static final long SUBMITTED_ID = -1L; + + private static final ListStateDescriptor<byte[]> REMAINING_REQUESTS_RAW_STATES_DESC = + new ListStateDescriptor<>( + "remaining_requests_raw_state", BytePrimitiveArraySerializer.INSTANCE); + + private final int compactThreads; + private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer; + + private final FileCompactor fileCompactor; + private final BucketWriter<?, String> bucketWriter; + + private transient ExecutorService compactService; + + private List<CompactorRequest> collectingRequests = new ArrayList<>(); + private final TreeMap<Long, List<CompactorRequest>> snapshotRequests = new TreeMap<>(); + private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> + compactingRequests = new LinkedList<>(); + + private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState; + + public CompactorOperator( + int compactThreads, + SimpleVersionedSerializer<FileSinkCommittable> committableSerializer, + FileCompactor fileCompactor, + BucketWriter<?, String> bucketWriter) { + this.compactThreads = compactThreads; + this.committableSerializer = committableSerializer; + this.fileCompactor = fileCompactor; + this.bucketWriter = bucketWriter; + } + + @Override + public void open() throws Exception { + super.open(); + this.compactService = + Executors.newFixedThreadPool( + Math.max(1, Math.min(compactThreads, Hardware.getNumberCPUCores())), + new ExecutorThreadFactory("compact-executor")); + } + + @Override + public void processElement(StreamRecord<CompactorRequest> element) throws Exception { + collectingRequests.add(element.getValue()); + } + + @Override + public void endInput() throws Exception { + // add collecting requests into the final snapshot + snapshotRequests.put(Long.MAX_VALUE, collectingRequests); + collectingRequests = new ArrayList<>(); + + // submit all requests and wait until they are done + submitUntil(Long.MAX_VALUE); + assert snapshotRequests.isEmpty(); + + getAllTasksFuture().join(); + emitCompacted(null); + assert compactingRequests.isEmpty(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + super.notifyCheckpointComplete(checkpointId); + submitUntil(checkpointId); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + emitCompacted(checkpointId); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + + // add collecting requests during the checkpoint into the snapshot + snapshotRequests.put(context.getCheckpointId(), collectingRequests); + collectingRequests = new ArrayList<>(); + + // snapshot all compacting requests as well, including the requests that are not finished + // when invoking prepareSnapshotPreBarrier but finished now, since they are not emitted yet + Map<Long, List<CompactorRequest>> requests = new HashMap<>(snapshotRequests); + requests.computeIfAbsent(SUBMITTED_ID, id -> new ArrayList<>()) + .addAll(compactingRequests.stream().map(r -> r.f0).collect(Collectors.toList())); + remainingRequestsState.update(Collections.singletonList(requests)); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + remainingRequestsState = + new SimpleVersionedListState<>( + context.getOperatorStateStore() + .getListState(REMAINING_REQUESTS_RAW_STATES_DESC), + new RemainingRequestsSerializer( + new CompactorRequestSerializer(committableSerializer))); + + Iterable<Map<Long, List<CompactorRequest>>> stateRemaining = remainingRequestsState.get(); + if (stateRemaining != null) { + for (Map<Long, List<CompactorRequest>> requests : stateRemaining) { + // elements can be more than one when redistributed after parallelism changing + for (Map.Entry<Long, List<CompactorRequest>> e : requests.entrySet()) { + List<CompactorRequest> list = + snapshotRequests.computeIfAbsent(e.getKey(), id -> new ArrayList<>()); + list.addAll(e.getValue()); + } + } + } + // submit all requests that is already submitted before this checkpoint + submitUntil(SUBMITTED_ID); + } + + private void submitUntil(long checkpointId) { + NavigableMap<Long, List<CompactorRequest>> canSubmit = + snapshotRequests.subMap(Long.MIN_VALUE, true, checkpointId, true); + Iterator<Entry<Long, List<CompactorRequest>>> iter = canSubmit.entrySet().iterator(); Review comment: nit: might not use part of word as variable name. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.compactor.operator; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.compactor.FileCompactor; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.util.Hardware; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * An operator that perform compaction for the {@link FileSink}. + * + * <p>Requests received from the {@link CompactCoordinator} will firstly be held in memory, and + * snapshot into the state of a checkpoint. When the checkpoint is successfully completed, all + * requests received before can be submitted. The results can be emitted at the next {@link + * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to ensure that committers + * can receive only one CommittableSummary and the corresponding number of Committable for a single + * checkpoint. + */ +public class CompactorOperator + extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>> + implements OneInputStreamOperator< + CompactorRequest, CommittableMessage<FileSinkCommittable>>, + BoundedOneInput, + CheckpointListener { + + private static final String COMPACTED_PREFIX = "compacted-"; + private static final long SUBMITTED_ID = -1L; + + private static final ListStateDescriptor<byte[]> REMAINING_REQUESTS_RAW_STATES_DESC = + new ListStateDescriptor<>( + "remaining_requests_raw_state", BytePrimitiveArraySerializer.INSTANCE); + + private final int compactThreads; + private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer; + + private final FileCompactor fileCompactor; + private final BucketWriter<?, String> bucketWriter; + + private transient ExecutorService compactService; + + private List<CompactorRequest> collectingRequests = new ArrayList<>(); Review comment: Might add some comments to these collections? ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.compactor.operator; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.compactor.FileCompactor; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.util.Hardware; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * An operator that perform compaction for the {@link FileSink}. + * + * <p>Requests received from the {@link CompactCoordinator} will firstly be held in memory, and + * snapshot into the state of a checkpoint. When the checkpoint is successfully completed, all + * requests received before can be submitted. The results can be emitted at the next {@link + * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to ensure that committers + * can receive only one CommittableSummary and the corresponding number of Committable for a single + * checkpoint. + */ +public class CompactorOperator + extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>> + implements OneInputStreamOperator< + CompactorRequest, CommittableMessage<FileSinkCommittable>>, + BoundedOneInput, + CheckpointListener { + + private static final String COMPACTED_PREFIX = "compacted-"; + private static final long SUBMITTED_ID = -1L; + + private static final ListStateDescriptor<byte[]> REMAINING_REQUESTS_RAW_STATES_DESC = + new ListStateDescriptor<>( + "remaining_requests_raw_state", BytePrimitiveArraySerializer.INSTANCE); + + private final int compactThreads; + private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer; + + private final FileCompactor fileCompactor; + private final BucketWriter<?, String> bucketWriter; + + private transient ExecutorService compactService; + + private List<CompactorRequest> collectingRequests = new ArrayList<>(); + private final TreeMap<Long, List<CompactorRequest>> snapshotRequests = new TreeMap<>(); + private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> + compactingRequests = new LinkedList<>(); + + private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState; + + public CompactorOperator( + int compactThreads, + SimpleVersionedSerializer<FileSinkCommittable> committableSerializer, + FileCompactor fileCompactor, + BucketWriter<?, String> bucketWriter) { + this.compactThreads = compactThreads; + this.committableSerializer = committableSerializer; + this.fileCompactor = fileCompactor; + this.bucketWriter = bucketWriter; + } + + @Override + public void open() throws Exception { + super.open(); + this.compactService = + Executors.newFixedThreadPool( + Math.max(1, Math.min(compactThreads, Hardware.getNumberCPUCores())), + new ExecutorThreadFactory("compact-executor")); + } + + @Override + public void processElement(StreamRecord<CompactorRequest> element) throws Exception { + collectingRequests.add(element.getValue()); + } + + @Override + public void endInput() throws Exception { + // add collecting requests into the final snapshot + snapshotRequests.put(Long.MAX_VALUE, collectingRequests); + collectingRequests = new ArrayList<>(); + + // submit all requests and wait until they are done + submitUntil(Long.MAX_VALUE); + assert snapshotRequests.isEmpty(); + + getAllTasksFuture().join(); + emitCompacted(null); + assert compactingRequests.isEmpty(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + super.notifyCheckpointComplete(checkpointId); + submitUntil(checkpointId); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + emitCompacted(checkpointId); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + + // add collecting requests during the checkpoint into the snapshot + snapshotRequests.put(context.getCheckpointId(), collectingRequests); + collectingRequests = new ArrayList<>(); + + // snapshot all compacting requests as well, including the requests that are not finished + // when invoking prepareSnapshotPreBarrier but finished now, since they are not emitted yet + Map<Long, List<CompactorRequest>> requests = new HashMap<>(snapshotRequests); + requests.computeIfAbsent(SUBMITTED_ID, id -> new ArrayList<>()) + .addAll(compactingRequests.stream().map(r -> r.f0).collect(Collectors.toList())); + remainingRequestsState.update(Collections.singletonList(requests)); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + remainingRequestsState = + new SimpleVersionedListState<>( + context.getOperatorStateStore() + .getListState(REMAINING_REQUESTS_RAW_STATES_DESC), + new RemainingRequestsSerializer( + new CompactorRequestSerializer(committableSerializer))); + + Iterable<Map<Long, List<CompactorRequest>>> stateRemaining = remainingRequestsState.get(); + if (stateRemaining != null) { + for (Map<Long, List<CompactorRequest>> requests : stateRemaining) { + // elements can be more than one when redistributed after parallelism changing + for (Map.Entry<Long, List<CompactorRequest>> e : requests.entrySet()) { + List<CompactorRequest> list = + snapshotRequests.computeIfAbsent(e.getKey(), id -> new ArrayList<>()); + list.addAll(e.getValue()); + } + } + } + // submit all requests that is already submitted before this checkpoint + submitUntil(SUBMITTED_ID); + } + + private void submitUntil(long checkpointId) { + NavigableMap<Long, List<CompactorRequest>> canSubmit = + snapshotRequests.subMap(Long.MIN_VALUE, true, checkpointId, true); + Iterator<Entry<Long, List<CompactorRequest>>> iter = canSubmit.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Long, List<CompactorRequest>> requestEntry = iter.next(); + for (CompactorRequest req : requestEntry.getValue()) { + CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = + new CompletableFuture<>(); + compactingRequests.add(new Tuple2<>(req, resultFuture)); + compactService.submit( + () -> { + try { + Iterable<FileSinkCommittable> result = compact(req); + resultFuture.complete(result); + } catch (Exception e) { + resultFuture.completeExceptionally(e); + } + }); + } + iter.remove(); + } + } + + private Iterable<FileSinkCommittable> compact(CompactorRequest request) throws Exception { + List<FileSinkCommittable> results = new ArrayList<>(request.getCommittableToPassthrough()); + + List<Path> compactingFiles = getCompactingPath(request, results); + if (compactingFiles.isEmpty()) { + return results; + } + + Path targetPath = assembleCompactedFilePath(compactingFiles.get(0)); + CompactingFileWriter compactingFileWriter = + bucketWriter.openNewCompactingFile( + fileCompactor.getWriterType(), + request.getBucketId(), + targetPath, + System.currentTimeMillis()); + fileCompactor.compact(compactingFiles, compactingFileWriter); + PendingFileRecoverable compactedPendingFile = compactingFileWriter.closeForCommit(); + + FileSinkCommittable compacted = + new FileSinkCommittable(request.getBucketId(), compactedPendingFile); + results.add(0, compacted); + for (Path f : compactingFiles) { + // cleanup compacted files + results.add(new FileSinkCommittable(request.getBucketId(), f)); + } + + return results; + } + + // results: side output pass through committable + private List<Path> getCompactingPath( + CompactorRequest request, List<FileSinkCommittable> results) throws IOException { + List<FileSinkCommittable> compactingCommittable = request.getCommittableToCompact(); + List<Path> compactingFiles = new ArrayList<>(); + + for (FileSinkCommittable committable : compactingCommittable) { + PendingFileRecoverable pendingFile = committable.getPendingFile(); + if (pendingFile == null + || pendingFile.getPath() == null + || !pendingFile.getPath().getName().startsWith(".")) { Review comment: Why don't we pass-through these pending files in coordinator? Since it might cause the `CompactStrategy` not fulfilled if we consider these files here. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.compactor.operator; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.compactor.FileCompactor; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.util.Hardware; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * An operator that perform compaction for the {@link FileSink}. + * + * <p>Requests received from the {@link CompactCoordinator} will firstly be held in memory, and + * snapshot into the state of a checkpoint. When the checkpoint is successfully completed, all + * requests received before can be submitted. The results can be emitted at the next {@link + * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to ensure that committers + * can receive only one CommittableSummary and the corresponding number of Committable for a single + * checkpoint. + */ +public class CompactorOperator + extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>> + implements OneInputStreamOperator< + CompactorRequest, CommittableMessage<FileSinkCommittable>>, + BoundedOneInput, + CheckpointListener { + + private static final String COMPACTED_PREFIX = "compacted-"; + private static final long SUBMITTED_ID = -1L; + + private static final ListStateDescriptor<byte[]> REMAINING_REQUESTS_RAW_STATES_DESC = + new ListStateDescriptor<>( + "remaining_requests_raw_state", BytePrimitiveArraySerializer.INSTANCE); + + private final int compactThreads; + private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer; + + private final FileCompactor fileCompactor; + private final BucketWriter<?, String> bucketWriter; + + private transient ExecutorService compactService; + + private List<CompactorRequest> collectingRequests = new ArrayList<>(); + private final TreeMap<Long, List<CompactorRequest>> snapshotRequests = new TreeMap<>(); + private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> + compactingRequests = new LinkedList<>(); + + private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState; + + public CompactorOperator( + int compactThreads, + SimpleVersionedSerializer<FileSinkCommittable> committableSerializer, + FileCompactor fileCompactor, + BucketWriter<?, String> bucketWriter) { + this.compactThreads = compactThreads; + this.committableSerializer = committableSerializer; + this.fileCompactor = fileCompactor; + this.bucketWriter = bucketWriter; + } + + @Override + public void open() throws Exception { + super.open(); + this.compactService = + Executors.newFixedThreadPool( + Math.max(1, Math.min(compactThreads, Hardware.getNumberCPUCores())), + new ExecutorThreadFactory("compact-executor")); + } + + @Override + public void processElement(StreamRecord<CompactorRequest> element) throws Exception { + collectingRequests.add(element.getValue()); + } + + @Override + public void endInput() throws Exception { + // add collecting requests into the final snapshot + snapshotRequests.put(Long.MAX_VALUE, collectingRequests); + collectingRequests = new ArrayList<>(); + + // submit all requests and wait until they are done + submitUntil(Long.MAX_VALUE); + assert snapshotRequests.isEmpty(); + + getAllTasksFuture().join(); + emitCompacted(null); + assert compactingRequests.isEmpty(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + super.notifyCheckpointComplete(checkpointId); + submitUntil(checkpointId); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + emitCompacted(checkpointId); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + + // add collecting requests during the checkpoint into the snapshot + snapshotRequests.put(context.getCheckpointId(), collectingRequests); + collectingRequests = new ArrayList<>(); + + // snapshot all compacting requests as well, including the requests that are not finished + // when invoking prepareSnapshotPreBarrier but finished now, since they are not emitted yet + Map<Long, List<CompactorRequest>> requests = new HashMap<>(snapshotRequests); + requests.computeIfAbsent(SUBMITTED_ID, id -> new ArrayList<>()) + .addAll(compactingRequests.stream().map(r -> r.f0).collect(Collectors.toList())); + remainingRequestsState.update(Collections.singletonList(requests)); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + remainingRequestsState = + new SimpleVersionedListState<>( + context.getOperatorStateStore() + .getListState(REMAINING_REQUESTS_RAW_STATES_DESC), + new RemainingRequestsSerializer( + new CompactorRequestSerializer(committableSerializer))); + + Iterable<Map<Long, List<CompactorRequest>>> stateRemaining = remainingRequestsState.get(); + if (stateRemaining != null) { + for (Map<Long, List<CompactorRequest>> requests : stateRemaining) { + // elements can be more than one when redistributed after parallelism changing + for (Map.Entry<Long, List<CompactorRequest>> e : requests.entrySet()) { + List<CompactorRequest> list = + snapshotRequests.computeIfAbsent(e.getKey(), id -> new ArrayList<>()); + list.addAll(e.getValue()); + } + } + } + // submit all requests that is already submitted before this checkpoint + submitUntil(SUBMITTED_ID); + } + + private void submitUntil(long checkpointId) { + NavigableMap<Long, List<CompactorRequest>> canSubmit = + snapshotRequests.subMap(Long.MIN_VALUE, true, checkpointId, true); + Iterator<Entry<Long, List<CompactorRequest>>> iter = canSubmit.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Long, List<CompactorRequest>> requestEntry = iter.next(); + for (CompactorRequest req : requestEntry.getValue()) { + CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = + new CompletableFuture<>(); + compactingRequests.add(new Tuple2<>(req, resultFuture)); + compactService.submit( + () -> { + try { + Iterable<FileSinkCommittable> result = compact(req); + resultFuture.complete(result); + } catch (Exception e) { + resultFuture.completeExceptionally(e); + } + }); + } + iter.remove(); + } + } + + private Iterable<FileSinkCommittable> compact(CompactorRequest request) throws Exception { + List<FileSinkCommittable> results = new ArrayList<>(request.getCommittableToPassthrough()); + + List<Path> compactingFiles = getCompactingPath(request, results); + if (compactingFiles.isEmpty()) { + return results; + } + + Path targetPath = assembleCompactedFilePath(compactingFiles.get(0)); + CompactingFileWriter compactingFileWriter = + bucketWriter.openNewCompactingFile( + fileCompactor.getWriterType(), + request.getBucketId(), + targetPath, + System.currentTimeMillis()); + fileCompactor.compact(compactingFiles, compactingFileWriter); + PendingFileRecoverable compactedPendingFile = compactingFileWriter.closeForCommit(); + + FileSinkCommittable compacted = + new FileSinkCommittable(request.getBucketId(), compactedPendingFile); + results.add(0, compacted); + for (Path f : compactingFiles) { + // cleanup compacted files + results.add(new FileSinkCommittable(request.getBucketId(), f)); + } + + return results; + } + + // results: side output pass through committable + private List<Path> getCompactingPath( + CompactorRequest request, List<FileSinkCommittable> results) throws IOException { + List<FileSinkCommittable> compactingCommittable = request.getCommittableToCompact(); + List<Path> compactingFiles = new ArrayList<>(); + + for (FileSinkCommittable committable : compactingCommittable) { + PendingFileRecoverable pendingFile = committable.getPendingFile(); + if (pendingFile == null + || pendingFile.getPath() == null + || !pendingFile.getPath().getName().startsWith(".")) { + // the file may be written with writer of elder version, or + // the file will be visible once committed, so it can not be compacted. + // pass through, add to results, do not add to compacting files + results.add(committable); + } else { + // commit the pending file and compact the committed file + bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery(); + compactingFiles.add(committable.getPendingFile().getPath()); + } + } + return compactingFiles; + } + + private static Path assembleCompactedFilePath(Path uncompactedPath) { + String uncompactedName = uncompactedPath.getName(); + if (uncompactedName.startsWith(".")) { + uncompactedName = uncompactedName.substring(1); + } + return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); Review comment: Do we need to add the prefix? Perhaps we could keep user's outputFileNameConfig? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
