This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e70c03495d528a067b166c3de38cd9258f68a6fa
Author: Yuxin Tan <[email protected]>
AuthorDate: Mon May 8 13:46:40 2023 +0800

    [FLINK-31635][network] Introduce the identifiers in the tiered storage
---
 .../io/network/partition/ResultPartitionID.java    | 23 +++++++
 .../TieredStorageBytesBasedDataIdentifier.java     | 76 ++++++++++++++++++++++
 .../tiered/common/TieredStorageDataIdentifier.java | 22 +++++++
 .../tiered/common/TieredStorageIdMappingUtils.java | 51 +++++++++++++++
 .../tiered/common/TieredStoragePartitionId.java    | 40 ++++++++++++
 .../tiered/common/TieredStorageSubpartitionId.java | 65 ++++++++++++++++++
 .../hybrid/tiered/common/TieredStorageTopicId.java | 40 ++++++++++++
 .../common/TieredStorageIdMappingUtilsTest.java    | 58 +++++++++++++++++
 8 files changed, 375 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
index 5355ab3992e..116d34653dd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
@@ -23,6 +23,9 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+
 import java.io.Serializable;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID.randomId;
@@ -54,6 +57,15 @@ public final class ResultPartitionID implements Serializable 
{
         this.producerId = checkNotNull(producerId);
     }
 
+    public ResultPartitionID(byte[] bytes) {
+        ByteBuf byteBuf = Unpooled.buffer();
+        byteBuf.writeBytes(bytes);
+
+        this.partitionId = IntermediateResultPartitionID.fromByteBuf(byteBuf);
+        this.producerId = ExecutionAttemptID.fromByteBuf(byteBuf);
+        byteBuf.release();
+    }
+
     public IntermediateResultPartitionID getPartitionId() {
         return partitionId;
     }
@@ -82,4 +94,15 @@ public final class ResultPartitionID implements Serializable 
{
     public String toString() {
         return partitionId.toString() + "@" + producerId.toString();
     }
+
+    public byte[] getBytes() {
+        ByteBuf byteBuf = Unpooled.buffer();
+        partitionId.writeTo(byteBuf);
+        producerId.writeTo(byteBuf);
+
+        byte[] bytes = new byte[byteBuf.readableBytes()];
+        byteBuf.readBytes(bytes);
+        byteBuf.release();
+        return bytes;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageBytesBasedDataIdentifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageBytesBasedDataIdentifier.java
new file mode 100644
index 00000000000..779530236ea
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageBytesBasedDataIdentifier.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The bytes based unique identification for the Tiered Storage. */
+public abstract class TieredStorageBytesBasedDataIdentifier
+        implements TieredStorageDataIdentifier, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The bytes data of this identifier. */
+    protected final byte[] bytes;
+
+    protected final int hashCode;
+
+    public TieredStorageBytesBasedDataIdentifier(byte[] bytes) {
+        checkArgument(bytes != null, "Must be not null.");
+
+        this.bytes = bytes;
+        this.hashCode = Arrays.hashCode(bytes);
+    }
+
+    public byte[] getBytes() {
+        return bytes;
+    }
+
+    @Override
+    public boolean equals(Object that) {
+        if (this == that) {
+            return true;
+        }
+
+        if (that == null || getClass() != that.getClass()) {
+            return false;
+        }
+
+        TieredStorageBytesBasedDataIdentifier thatID = 
(TieredStorageBytesBasedDataIdentifier) that;
+        return hashCode == thatID.hashCode && Arrays.equals(bytes, 
thatID.bytes);
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public String toString() {
+        return "TieredStorageBytesBasedDataIdentifier{"
+                + "ID="
+                + StringUtils.byteToHexString(bytes)
+                + '}';
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageDataIdentifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageDataIdentifier.java
new file mode 100644
index 00000000000..b07e55f214d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageDataIdentifier.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+/** Identifier interface in the Tiered Storage. */
+public interface TieredStorageDataIdentifier {}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtils.java
new file mode 100644
index 00000000000..21e44719875
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.util.AbstractID;
+
+/** Utils to convert the Ids to Tiered Storage Ids, or vice versa. */
+public class TieredStorageIdMappingUtils {
+
+    public static TieredStorageTopicId convertId(IntermediateDataSetID 
intermediateDataSetID) {
+        return new TieredStorageTopicId(intermediateDataSetID.getBytes());
+    }
+
+    public static IntermediateDataSetID convertId(TieredStorageTopicId 
topicId) {
+        return new IntermediateDataSetID(new AbstractID(topicId.getBytes()));
+    }
+
+    public static TieredStoragePartitionId convertId(ResultPartitionID 
resultPartitionId) {
+        return new TieredStoragePartitionId(resultPartitionId.getBytes());
+    }
+
+    public static ResultPartitionID convertId(TieredStoragePartitionId 
partitionId) {
+        return new ResultPartitionID(partitionId.getBytes());
+    }
+
+    public static TieredStorageSubpartitionId convertId(int subpartitionId) {
+        return new TieredStorageSubpartitionId(subpartitionId);
+    }
+
+    public static int convertId(TieredStorageSubpartitionId subpartitionId) {
+        return subpartitionId.getSubpartitionId();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStoragePartitionId.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStoragePartitionId.java
new file mode 100644
index 00000000000..0d130f59bee
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStoragePartitionId.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.util.StringUtils;
+
+/**
+ * Identifier of a partition.
+ *
+ * <p>A partition is equivalent to a result partition in Flink.
+ */
+public class TieredStoragePartitionId extends 
TieredStorageBytesBasedDataIdentifier {
+
+    private static final long serialVersionUID = 1L;
+
+    public TieredStoragePartitionId(byte[] bytes) {
+        super(bytes);
+    }
+
+    @Override
+    public String toString() {
+        return "TieredStoragePartitionId{" + "ID=" + 
StringUtils.byteToHexString(bytes) + '}';
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageSubpartitionId.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageSubpartitionId.java
new file mode 100644
index 00000000000..2cb0f4f155f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageSubpartitionId.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import java.io.Serializable;
+
+/**
+ * Identifier of a subpartition.
+ *
+ * <p>A subpartition is equivalent to a subpartition in Flink.
+ */
+public class TieredStorageSubpartitionId implements 
TieredStorageDataIdentifier, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int subpartitionId;
+
+    public TieredStorageSubpartitionId(int subpartitionId) {
+        this.subpartitionId = subpartitionId;
+    }
+
+    public int getSubpartitionId() {
+        return subpartitionId;
+    }
+
+    @Override
+    public int hashCode() {
+        return subpartitionId;
+    }
+
+    @Override
+    public boolean equals(Object that) {
+        if (this == that) {
+            return true;
+        }
+
+        if (that == null || getClass() != that.getClass()) {
+            return false;
+        }
+
+        TieredStorageSubpartitionId thatID = (TieredStorageSubpartitionId) 
that;
+        return subpartitionId == thatID.subpartitionId;
+    }
+
+    @Override
+    public String toString() {
+        return "TieredStorageSubpartitionId{" + "ID=" + subpartitionId + '}';
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageTopicId.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageTopicId.java
new file mode 100644
index 00000000000..f686a877ee8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageTopicId.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.util.StringUtils;
+
+/**
+ * Identifier of a topic.
+ *
+ * <p>A topic is equivalent to an intermediate data set in Flink.
+ */
+public class TieredStorageTopicId extends 
TieredStorageBytesBasedDataIdentifier {
+
+    private static final long serialVersionUID = 1L;
+
+    public TieredStorageTopicId(byte[] bytes) {
+        super(bytes);
+    }
+
+    @Override
+    public String toString() {
+        return "TieredStorageTopicId{" + "ID=" + 
StringUtils.byteToHexString(bytes) + '}';
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtilsTest.java
new file mode 100644
index 00000000000..f8363969af5
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageIdMappingUtilsTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TieredStorageIdMappingUtils}. */
+public class TieredStorageIdMappingUtilsTest {
+
+    @Test
+    void testConvertDataSetId() {
+        IntermediateDataSetID dataSetID = new IntermediateDataSetID();
+        TieredStorageTopicId topicId = 
TieredStorageIdMappingUtils.convertId(dataSetID);
+        IntermediateDataSetID convertedDataSetID = 
TieredStorageIdMappingUtils.convertId(topicId);
+        assertThat(dataSetID).isEqualTo(convertedDataSetID);
+    }
+
+    @Test
+    void testConvertResultPartitionId() {
+        ResultPartitionID resultPartitionID = new ResultPartitionID();
+        TieredStoragePartitionId tieredStoragePartitionId =
+                TieredStorageIdMappingUtils.convertId(resultPartitionID);
+        ResultPartitionID convertedResultPartitionID =
+                
TieredStorageIdMappingUtils.convertId(tieredStoragePartitionId);
+        assertThat(resultPartitionID).isEqualTo(convertedResultPartitionID);
+    }
+
+    @Test
+    void testConvertSubpartitionId() {
+        int subpartitionId = 2;
+        TieredStorageSubpartitionId tieredStorageSubpartitionId =
+                TieredStorageIdMappingUtils.convertId(subpartitionId);
+        int convertedSubpartitionId =
+                
TieredStorageIdMappingUtils.convertId(tieredStorageSubpartitionId);
+        assertThat(subpartitionId).isEqualTo(convertedSubpartitionId);
+    }
+}

Reply via email to