gaoyunhaii commented on a change in pull request #18680: URL: https://github.com/apache/flink/pull/18680#discussion_r803476471
########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.compactor.operator; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.connector.file.sink.compactor.FileCompactor; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.util.Hardware; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * An operator that perform compaction for the {@link FileSink}. + * + * <p>Requests received from the {@link CompactCoordinator} will firstly be held in memory, and + * snapshot into the state of a checkpoint. When the checkpoint is successfully completed, all + * requests received before can be submitted. The results can be emitted at the next {@link + * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to ensure that committers + * can receive only one CommittableSummary and the corresponding number of Committable for a single + * checkpoint. + */ +public class CompactorOperator + extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>> + implements OneInputStreamOperator< + CompactorRequest, CommittableMessage<FileSinkCommittable>>, + BoundedOneInput, + CheckpointListener { + + private static final String COMPACTED_PREFIX = "compacted-"; + private static final long SUBMITTED_ID = -1L; + + private static final ListStateDescriptor<byte[]> REMAINING_REQUESTS_RAW_STATES_DESC = + new ListStateDescriptor<>( + "remaining_requests_raw_state", BytePrimitiveArraySerializer.INSTANCE); + + private final int compactThreads; + private final SimpleVersionedSerializer<FileSinkCommittable> committableSerializer; + + private final FileCompactor fileCompactor; + private final BucketWriter<?, String> bucketWriter; + + private transient ExecutorService compactService; + + private List<CompactorRequest> collectingRequests = new ArrayList<>(); + private final TreeMap<Long, List<CompactorRequest>> snapshotRequests = new TreeMap<>(); + private final List<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> + compactingRequests = new LinkedList<>(); + + private ListState<Map<Long, List<CompactorRequest>>> remainingRequestsState; + + public CompactorOperator( + int compactThreads, + SimpleVersionedSerializer<FileSinkCommittable> committableSerializer, + FileCompactor fileCompactor, + BucketWriter<?, String> bucketWriter) { + this.compactThreads = compactThreads; + this.committableSerializer = committableSerializer; + this.fileCompactor = fileCompactor; + this.bucketWriter = bucketWriter; + } + + @Override + public void open() throws Exception { + super.open(); + this.compactService = + Executors.newFixedThreadPool( + Math.max(1, Math.min(compactThreads, Hardware.getNumberCPUCores())), + new ExecutorThreadFactory("compact-executor")); + } + + @Override + public void processElement(StreamRecord<CompactorRequest> element) throws Exception { + collectingRequests.add(element.getValue()); + } + + @Override + public void endInput() throws Exception { + // add collecting requests into the final snapshot + snapshotRequests.put(Long.MAX_VALUE, collectingRequests); + collectingRequests = new ArrayList<>(); + + // submit all requests and wait until they are done + submitUntil(Long.MAX_VALUE); + assert snapshotRequests.isEmpty(); + + getAllTasksFuture().join(); + emitCompacted(null); + assert compactingRequests.isEmpty(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + super.notifyCheckpointComplete(checkpointId); + submitUntil(checkpointId); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + emitCompacted(checkpointId); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + + // add collecting requests during the checkpoint into the snapshot + snapshotRequests.put(context.getCheckpointId(), collectingRequests); + collectingRequests = new ArrayList<>(); + + // snapshot all compacting requests as well, including the requests that are not finished + // when invoking prepareSnapshotPreBarrier but finished now, since they are not emitted yet + Map<Long, List<CompactorRequest>> requests = new HashMap<>(snapshotRequests); + requests.computeIfAbsent(SUBMITTED_ID, id -> new ArrayList<>()) + .addAll(compactingRequests.stream().map(r -> r.f0).collect(Collectors.toList())); + remainingRequestsState.update(Collections.singletonList(requests)); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + remainingRequestsState = + new SimpleVersionedListState<>( + context.getOperatorStateStore() + .getListState(REMAINING_REQUESTS_RAW_STATES_DESC), + new RemainingRequestsSerializer( + new CompactorRequestSerializer(committableSerializer))); + + Iterable<Map<Long, List<CompactorRequest>>> stateRemaining = remainingRequestsState.get(); + if (stateRemaining != null) { + for (Map<Long, List<CompactorRequest>> requests : stateRemaining) { + // elements can be more than one when redistributed after parallelism changing + for (Map.Entry<Long, List<CompactorRequest>> e : requests.entrySet()) { + List<CompactorRequest> list = + snapshotRequests.computeIfAbsent(e.getKey(), id -> new ArrayList<>()); + list.addAll(e.getValue()); + } + } + } + // submit all requests that is already submitted before this checkpoint + submitUntil(SUBMITTED_ID); + } + + private void submitUntil(long checkpointId) { + NavigableMap<Long, List<CompactorRequest>> canSubmit = + snapshotRequests.subMap(Long.MIN_VALUE, true, checkpointId, true); + Iterator<Entry<Long, List<CompactorRequest>>> iter = canSubmit.entrySet().iterator(); + while (iter.hasNext()) { + Entry<Long, List<CompactorRequest>> requestEntry = iter.next(); + for (CompactorRequest req : requestEntry.getValue()) { + CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = + new CompletableFuture<>(); + compactingRequests.add(new Tuple2<>(req, resultFuture)); + compactService.submit( + () -> { + try { + Iterable<FileSinkCommittable> result = compact(req); + resultFuture.complete(result); + } catch (Exception e) { + resultFuture.completeExceptionally(e); + } + }); + } + iter.remove(); + } + } + + private Iterable<FileSinkCommittable> compact(CompactorRequest request) throws Exception { + List<FileSinkCommittable> results = new ArrayList<>(request.getCommittableToPassthrough()); + + List<Path> compactingFiles = getCompactingPath(request, results); + if (compactingFiles.isEmpty()) { + return results; + } + + Path targetPath = assembleCompactedFilePath(compactingFiles.get(0)); + CompactingFileWriter compactingFileWriter = + bucketWriter.openNewCompactingFile( + fileCompactor.getWriterType(), + request.getBucketId(), + targetPath, + System.currentTimeMillis()); + fileCompactor.compact(compactingFiles, compactingFileWriter); + PendingFileRecoverable compactedPendingFile = compactingFileWriter.closeForCommit(); + + FileSinkCommittable compacted = + new FileSinkCommittable(request.getBucketId(), compactedPendingFile); + results.add(0, compacted); Review comment: Why we need to add it to the head of the list? It should be quite inefficient to insert elements to an array. ########## File path: flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java ########## @@ -170,15 +177,38 @@ public WriterProperties getProperties() { implements PendingFileRecoverable { private final RecoverableWriter.CommitRecoverable commitRecoverable; + private final Path targetPath; Review comment: @Nullable ########## File path: flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java ########## @@ -189,15 +219,36 @@ public OutputStreamBasedPendingFileRecoverable( implements InProgressFileRecoverable { private final RecoverableWriter.ResumeRecoverable resumeRecoverable; + private final Path targetPath; Review comment: @Nullable ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/DecoderBasedReader.java ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.compactor; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; + +/** + * A {@link RecordWiseFileCompactor.Reader} implementation that reads the file as an {@link + * FSDataInputStream} and decodes the record with the {@link Decoder}. + */ +public class DecoderBasedReader<T> implements RecordWiseFileCompactor.Reader<T> { + private final Decoder<T> decoder; + private final FSDataInputStream input; + + public DecoderBasedReader(Path path, Decoder<T> decoder) throws IOException { + this.decoder = decoder; + this.input = path.getFileSystem().open(path); + } + + @Override + public T read() throws IOException { + if (input.available() > 0) { + return decoder.decodeNext(input); + } + return null; + } + + @Override + public void close() throws Exception { + input.close(); + } + + /** + * A {@link Decoder} to decode the file content into the actual records. + * + * <p>A {@link Decoder} is generally the reverse of a {@link + * org.apache.flink.api.common.serialization.Encoder}. + * + * @param <T> Thy type of the records the reader is reading. + */ + public interface Decoder<T> extends Serializable { + /** + * @return The next record that decoded from the InputStream, or null if no more available. + */ + T decodeNext(InputStream input) throws IOException; Review comment: Might split to `open` and `decodeNext` to allow the users to cache a reader easily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
