This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7aba50fe56aa8b6b6ef2474ed1856aa87d92242c Author: Yuxin Tan <[email protected]> AuthorDate: Mon May 8 13:51:56 2023 +0800 [FLINK-31635][network] Introduce the multiple storage clients for the tiered storage --- .../hybrid/tiered/storage/BufferAccumulator.java | 59 ++++++++++ .../storage/TieredStorageConsumerClient.java | 22 ++++ .../tiered/storage/TieredStorageMasterClient.java | 42 +++++++ .../storage/TieredStorageProducerClient.java | 125 +++++++++++++++++++++ 4 files changed, 248 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java new file mode 100644 index 00000000000..e57f8383051 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/BufferAccumulator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.BiConsumer; + +/** + * Accumulates received records into buffers. The {@link BufferAccumulator} receives the records + * from tiered store producer and the records will accumulate and transform into buffers. + */ +public interface BufferAccumulator { + + /** + * Setup the accumulator. + * + * @param numSubpartitions number of subpartitions + * @param bufferFlusher accepts the accumulated buffers. The first field is the subpartition id, + * while the list in the second field contains accumulated buffers in order for that + * subpartition. + */ + void setup( + int numSubpartitions, + BiConsumer<TieredStorageSubpartitionId, List<Buffer>> bufferFlusher); + + /** + * Receives the records from tiered store producer, these records will be accumulated and + * transformed into finished buffers. + */ + void receive( + ByteBuffer record, TieredStorageSubpartitionId subpartitionId, Buffer.DataType dataType) + throws IOException; + + /** + * Close the accumulator. This will flush all the remaining data and release all the resources. + */ + void close(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java new file mode 100644 index 00000000000..f1b69c347dc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +/** Client of the Tiered Storage used by the consumer. */ +public class TieredStorageConsumerClient {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java new file mode 100644 index 00000000000..60f2c779f30 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; + +import java.util.List; + +/** Client of the Tiered Storage used by the master. */ +public class TieredStorageMasterClient { + + private final List<TierMasterAgent> tiers; + + public TieredStorageMasterClient(List<TierMasterAgent> tiers) { + this.tiers = tiers; + } + + public void addPartition(TieredStoragePartitionId partitionId) { + tiers.forEach(tierMasterAgent -> tierMasterAgent.addPartition(partitionId)); + } + + public void releasePartition(TieredStoragePartitionId partitionId) { + tiers.forEach(tierMasterAgent -> tierMasterAgent.releasePartition(partitionId)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java new file mode 100644 index 00000000000..34a0f03f1e1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.util.ExceptionUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** Client of the Tiered Storage used by the producer. */ +public class TieredStorageProducerClient { + private final boolean isBroadcastOnly; + + private final int numSubpartitions; + + private final BufferAccumulator bufferAccumulator; + + private final BufferCompressor bufferCompressor; + + private final List<TierProducerAgent> tierProducerAgents; + + public TieredStorageProducerClient( + int numSubpartitions, + boolean isBroadcastOnly, + BufferAccumulator bufferAccumulator, + @Nullable BufferCompressor bufferCompressor, + List<TierProducerAgent> tierProducerAgents) { + this.isBroadcastOnly = isBroadcastOnly; + this.numSubpartitions = numSubpartitions; + this.bufferAccumulator = bufferAccumulator; + this.bufferCompressor = bufferCompressor; + this.tierProducerAgents = tierProducerAgents; + + bufferAccumulator.setup(numSubpartitions, this::writeAccumulatedBuffers); + } + + /** + * Write records to the producer client. The {@link BufferAccumulator} will accumulate the + * records into buffers. + * + * <p>Note that isBroadcast indicates whether the record is broadcast, while isBroadcastOnly + * indicates whether the result partition is broadcast-only. When the result partition is not + * broadcast-only and the record is a broadcast record, the record will be written to all the + * subpartitions. + * + * @param record the written record data + * @param subpartitionId the subpartition identifier + * @param dataType the data type of the record + * @param isBroadcast whether the record is a broadcast record + */ + public void write( + ByteBuffer record, + TieredStorageSubpartitionId subpartitionId, + Buffer.DataType dataType, + boolean isBroadcast) + throws IOException { + + if (isBroadcast && !isBroadcastOnly) { + for (int i = 0; i < numSubpartitions; ++i) { + bufferAccumulator.receive(record.duplicate(), subpartitionId, dataType); + } + } else { + bufferAccumulator.receive(record, subpartitionId, dataType); + } + } + + public void close() { + bufferAccumulator.close(); + tierProducerAgents.forEach(TierProducerAgent::close); + } + + /** + * Write the accumulated buffers of this subpartitionId to the appropriate tiers. + * + * @param subpartitionId the subpartition identifier + * @param accumulatedBuffers the accumulated buffers of this subpartition + */ + private void writeAccumulatedBuffers( + TieredStorageSubpartitionId subpartitionId, List<Buffer> accumulatedBuffers) { + try { + for (Buffer finishedBuffer : accumulatedBuffers) { + writeAccumulatedBuffer(subpartitionId, finishedBuffer); + } + } catch (IOException e) { + ExceptionUtils.rethrow(e); + } + } + + /** + * Write the accumulated buffer of this subpartitionId to an appropriate tier. After the tier is + * decided, the buffer will be written to the selected tier. + * + * @param subpartitionId the subpartition identifier + * @param accumulatedBuffer one accumulated buffer of this subpartition + */ + private void writeAccumulatedBuffer( + TieredStorageSubpartitionId subpartitionId, Buffer accumulatedBuffer) + throws IOException { + // TODO, Try to write the accumulated buffer to the appropriate tier. After the tier is + // decided, then write the accumulated buffer to the tier. + } +}
