This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e70c03495d528a067b166c3de38cd9258f68a6fa Author: Yuxin Tan <[email protected]> AuthorDate: Mon May 8 13:46:40 2023 +0800 [FLINK-31635][network] Introduce the identifiers in the tiered storage --- .../io/network/partition/ResultPartitionID.java | 23 +++++++ .../TieredStorageBytesBasedDataIdentifier.java | 76 ++++++++++++++++++++++ .../tiered/common/TieredStorageDataIdentifier.java | 22 +++++++ .../tiered/common/TieredStorageIdMappingUtils.java | 51 +++++++++++++++ .../tiered/common/TieredStoragePartitionId.java | 40 ++++++++++++ .../tiered/common/TieredStorageSubpartitionId.java | 65 ++++++++++++++++++ .../hybrid/tiered/common/TieredStorageTopicId.java | 40 ++++++++++++ .../common/TieredStorageIdMappingUtilsTest.java | 58 +++++++++++++++++ 8 files changed, 375 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java index 5355ab3992e..116d34653dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java @@ -23,6 +23,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; + import java.io.Serializable; import static org.apache.flink.runtime.executiongraph.ExecutionAttemptID.randomId; @@ -54,6 +57,15 @@ public final class ResultPartitionID implements Serializable { this.producerId = checkNotNull(producerId); } + public ResultPartitionID(byte[] bytes) { + ByteBuf byteBuf = Unpooled.buffer(); + byteBuf.writeBytes(bytes); + + this.partitionId = IntermediateResultPartitionID.fromByteBuf(byteBuf); + this.producerId = ExecutionAttemptID.fromByteBuf(byteBuf); + byteBuf.release(); + } + public IntermediateResultPartitionID getPartitionId() { return partitionId; } @@ -82,4 +94,15 @@ public final class ResultPartitionID implements Serializable { public String toString() { return partitionId.toString() + "@" + producerId.toString(); } + + public byte[] getBytes() { + ByteBuf byteBuf = Unpooled.buffer(); + partitionId.writeTo(byteBuf); + producerId.writeTo(byteBuf); + + byte[] bytes = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(bytes); + byteBuf.release(); + return bytes; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageBytesBasedDataIdentifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageBytesBasedDataIdentifier.java new file mode 100644 index 00000000000..779530236ea --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageBytesBasedDataIdentifier.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import org.apache.flink.util.StringUtils; + +import java.io.Serializable; +import java.util.Arrays; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** The bytes based unique identification for the Tiered Storage. */ +public abstract class TieredStorageBytesBasedDataIdentifier + implements TieredStorageDataIdentifier, Serializable { + + private static final long serialVersionUID = 1L; + + /** The bytes data of this identifier. */ + protected final byte[] bytes; + + protected final int hashCode; + + public TieredStorageBytesBasedDataIdentifier(byte[] bytes) { + checkArgument(bytes != null, "Must be not null."); + + this.bytes = bytes; + this.hashCode = Arrays.hashCode(bytes); + } + + public byte[] getBytes() { + return bytes; + } + + @Override + public boolean equals(Object that) { + if (this == that) { + return true; + } + + if (that == null || getClass() != that.getClass()) { + return false; + } + + TieredStorageBytesBasedDataIdentifier thatID = (TieredStorageBytesBasedDataIdentifier) that; + return hashCode == thatID.hashCode && Arrays.equals(bytes, thatID.bytes); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public String toString() { + return "TieredStorageBytesBasedDataIdentifier{" + + "ID=" + + StringUtils.byteToHexString(bytes) + + '}'; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageDataIdentifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageDataIdentifier.java new file mode 100644 index 00000000000..b07e55f214d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageDataIdentifier.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +/** Identifier interface in the Tiered Storage. */ +public interface TieredStorageDataIdentifier {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtils.java new file mode 100644 index 00000000000..21e44719875 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.util.AbstractID; + +/** Utils to convert the Ids to Tiered Storage Ids, or vice versa. */ +public class TieredStorageIdMappingUtils { + + public static TieredStorageTopicId convertId(IntermediateDataSetID intermediateDataSetID) { + return new TieredStorageTopicId(intermediateDataSetID.getBytes()); + } + + public static IntermediateDataSetID convertId(TieredStorageTopicId topicId) { + return new IntermediateDataSetID(new AbstractID(topicId.getBytes())); + } + + public static TieredStoragePartitionId convertId(ResultPartitionID resultPartitionId) { + return new TieredStoragePartitionId(resultPartitionId.getBytes()); + } + + public static ResultPartitionID convertId(TieredStoragePartitionId partitionId) { + return new ResultPartitionID(partitionId.getBytes()); + } + + public static TieredStorageSubpartitionId convertId(int subpartitionId) { + return new TieredStorageSubpartitionId(subpartitionId); + } + + public static int convertId(TieredStorageSubpartitionId subpartitionId) { + return subpartitionId.getSubpartitionId(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStoragePartitionId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStoragePartitionId.java new file mode 100644 index 00000000000..0d130f59bee --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStoragePartitionId.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import org.apache.flink.util.StringUtils; + +/** + * Identifier of a partition. + * + * <p>A partition is equivalent to a result partition in Flink. + */ +public class TieredStoragePartitionId extends TieredStorageBytesBasedDataIdentifier { + + private static final long serialVersionUID = 1L; + + public TieredStoragePartitionId(byte[] bytes) { + super(bytes); + } + + @Override + public String toString() { + return "TieredStoragePartitionId{" + "ID=" + StringUtils.byteToHexString(bytes) + '}'; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageSubpartitionId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageSubpartitionId.java new file mode 100644 index 00000000000..2cb0f4f155f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageSubpartitionId.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import java.io.Serializable; + +/** + * Identifier of a subpartition. + * + * <p>A subpartition is equivalent to a subpartition in Flink. + */ +public class TieredStorageSubpartitionId implements TieredStorageDataIdentifier, Serializable { + + private static final long serialVersionUID = 1L; + + private final int subpartitionId; + + public TieredStorageSubpartitionId(int subpartitionId) { + this.subpartitionId = subpartitionId; + } + + public int getSubpartitionId() { + return subpartitionId; + } + + @Override + public int hashCode() { + return subpartitionId; + } + + @Override + public boolean equals(Object that) { + if (this == that) { + return true; + } + + if (that == null || getClass() != that.getClass()) { + return false; + } + + TieredStorageSubpartitionId thatID = (TieredStorageSubpartitionId) that; + return subpartitionId == thatID.subpartitionId; + } + + @Override + public String toString() { + return "TieredStorageSubpartitionId{" + "ID=" + subpartitionId + '}'; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageTopicId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageTopicId.java new file mode 100644 index 00000000000..f686a877ee8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageTopicId.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import org.apache.flink.util.StringUtils; + +/** + * Identifier of a topic. + * + * <p>A topic is equivalent to an intermediate data set in Flink. + */ +public class TieredStorageTopicId extends TieredStorageBytesBasedDataIdentifier { + + private static final long serialVersionUID = 1L; + + public TieredStorageTopicId(byte[] bytes) { + super(bytes); + } + + @Override + public String toString() { + return "TieredStorageTopicId{" + "ID=" + StringUtils.byteToHexString(bytes) + '}'; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtilsTest.java new file mode 100644 index 00000000000..f8363969af5 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtilsTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TieredStorageIdMappingUtils}. */ +public class TieredStorageIdMappingUtilsTest { + + @Test + void testConvertDataSetId() { + IntermediateDataSetID dataSetID = new IntermediateDataSetID(); + TieredStorageTopicId topicId = TieredStorageIdMappingUtils.convertId(dataSetID); + IntermediateDataSetID convertedDataSetID = TieredStorageIdMappingUtils.convertId(topicId); + assertThat(dataSetID).isEqualTo(convertedDataSetID); + } + + @Test + void testConvertResultPartitionId() { + ResultPartitionID resultPartitionID = new ResultPartitionID(); + TieredStoragePartitionId tieredStoragePartitionId = + TieredStorageIdMappingUtils.convertId(resultPartitionID); + ResultPartitionID convertedResultPartitionID = + TieredStorageIdMappingUtils.convertId(tieredStoragePartitionId); + assertThat(resultPartitionID).isEqualTo(convertedResultPartitionID); + } + + @Test + void testConvertSubpartitionId() { + int subpartitionId = 2; + TieredStorageSubpartitionId tieredStorageSubpartitionId = + TieredStorageIdMappingUtils.convertId(subpartitionId); + int convertedSubpartitionId = + TieredStorageIdMappingUtils.convertId(tieredStorageSubpartitionId); + assertThat(subpartitionId).isEqualTo(convertedSubpartitionId); + } +}
