JingsongLi commented on a change in pull request #13744: URL: https://github.com/apache/flink/pull/13744#discussion_r512392582
########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactMessages.java ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.filesystem.stream.compact; + +import org.apache.flink.core.fs.Path; + +import java.io.Serializable; +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Util class for all compaction messages. + * + * <p>The compaction operator graph is: + * TempFileWriter|parallel ---(InputFile&EndInputFile)---> CompactCoordinator|non-parallel + * ---(CompactionUnit&EndCompaction)--->CompactOperator|parallel---(PartitionCommitInfo)---> + * PartitionCommitter|non-parallel + * + * <p>Because the end message is a kind of barrier of record messages, they can only be transmitted + * in the way of full broadcast in the link from coordinator to compact operator. + */ +public class CompactMessages { + private CompactMessages() {} + + /** + * The input of compact coordinator. + */ + public interface CoordinatorInput extends Serializable {} + + /** + * A partitioned input file. + */ + public static class InputFile implements CoordinatorInput { + private final String partition; + private final Path file; + + public InputFile(String partition, Path file) { + this.partition = partition; + this.file = file; + } + + public String getPartition() { + return partition; + } + + public Path getFile() { + return file; + } + } + + /** + * A flag to end file input. + */ + public static class EndInputFile implements CoordinatorInput { + private final long checkpointId; + private final int taskId; + private final int numberOfTasks; + + public EndInputFile(long checkpointId, int taskId, int numberOfTasks) { + this.checkpointId = checkpointId; + this.taskId = taskId; + this.numberOfTasks = numberOfTasks; + } + + public long getCheckpointId() { + return checkpointId; + } + + public int getTaskId() { + return taskId; + } + + public int getNumberOfTasks() { + return numberOfTasks; + } + } + + /** + * The output of compact coordinator. + */ + public interface CoordinatorOutput extends Serializable {} + + /** + * The unit of a single compaction. + */ + public static class CompactionUnit implements CoordinatorOutput { + + private final int unitId; + private final String partition; + + // Store strings to improve serialization performance. + private final String[] pathStrings; + + public CompactionUnit(int unitId, String partition, List<Path> unit) { + this.unitId = unitId; + this.partition = partition; + this.pathStrings = unit.stream() + .map(Path::toUri) + .map(URI::toString) + .toArray(String[]::new); + } + + public boolean isTaskMessage(int taskId) { Review comment: Good catch, there is a bug here, should be: ``` public boolean isTaskMessage(int taskNumber, int taskId) { return unitId % taskNumber == taskId; } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
