xintongsong commented on code in PR #22728:
URL: https://github.com/apache/flink/pull/22728#discussion_r1222376428


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/local/memory/MemoryTierFactory.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.local.memory;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/** The memory tier factory implementation for {@link TierFactory}. */
+public class MemoryTierFactory implements TierFactory {
+
+    private final int numBytesPerSegment;
+
+    public MemoryTierFactory(int numBytesPerSegment) {
+        this.numBytesPerSegment = numBytesPerSegment;
+    }
+
+    @Override
+    public TierMasterAgent createMasterAgent(
+            TieredStorageResourceRegistry tieredStorageResourceRegistry) {
+        return new MemoryTierMasterAgent();
+    }
+
+    @Override
+    public TierProducerAgent createProducerAgent(
+            int tierIndex,
+            int numSubpartitions,
+            int bufferSize,
+            ResultPartitionID resultPartitionID,
+            String dataFileBasePath,
+            float minReservedDiskSpaceFraction,
+            boolean isBroadcastOnly,
+            TieredStorageMemoryManager storageMemoryManager,
+            TieredStorageNettyService nettyService,
+            TieredStorageResourceRegistry resourceRegistry) {

Review Comment:
   1. `tierIndex` is not used
   2. `bufferSize` and `minReservedDiskSpaceFraction` are not per-agent 
configs. Might be better to set them in factory constructors.
   3. Should convert `ResultPartitionID` to `TieredStoragePartitionId` outside 
the class.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/local/memory/MemoryTierFactory.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.local.memory;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+/** The memory tier factory implementation for {@link TierFactory}. */
+public class MemoryTierFactory implements TierFactory {
+
+    private final int numBytesPerSegment;

Review Comment:
   Minor: `segmentSizeBytes`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/local/memory/MemoryTierProducerAgent.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.local.memory;
+
+import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The memory tier implementation of {@link TierProducerAgent}. */
+public class MemoryTierProducerAgent implements TierProducerAgent {
+
+    private final int numBytesPerSegment;
+
+    private final int numBuffersPerSegment;
+
+    private final TieredStorageMemoryManager storageMemoryManager;
+
+    private final boolean isBroadcastOnly;
+
+    // Record the byte number currently written to each sub partition.
+    private final int[] numSubpartitionEmitBytes;
+
+    /**
+     * Each element of the list is all views of the subpartition corresponding 
to its index, which
+     * are stored in the form of a map that maps consumer id to its 
subpartition view.
+     */
+    private final boolean[] nettyServiceRegistered;
+
+    private final MemoryTierSubpartitionProducerAgent[] 
subpartitionProducerAgents;
+
+    public MemoryTierProducerAgent(
+            TieredStoragePartitionId partitionId,
+            int numSubpartitions,
+            int bufferSize,
+            int numBytesPerSegment,
+            TieredStorageMemoryManager storageMemoryManager,
+            boolean isBroadcastOnly,
+            TieredStorageNettyService nettyService,
+            TieredStorageResourceRegistry resourceRegistry) {
+        checkArgument(
+                numBytesPerSegment >= bufferSize, "One segment contains at 
least one buffer.");
+
+        this.numBytesPerSegment = numBytesPerSegment;
+        this.numBuffersPerSegment = numBytesPerSegment / bufferSize;
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.storageMemoryManager = storageMemoryManager;
+        this.numSubpartitionEmitBytes = new int[numSubpartitions];
+        this.nettyServiceRegistered = new boolean[numSubpartitions];
+        this.subpartitionProducerAgents = new 
MemoryTierSubpartitionProducerAgent[numSubpartitions];
+
+        Arrays.fill(numSubpartitionEmitBytes, 0);
+        nettyService.registerProducer(partitionId, 
createNettyServiceProducer());
+        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+            subpartitionProducerAgents[subpartitionId] =
+                    new MemoryTierSubpartitionProducerAgent(subpartitionId, 
nettyService);
+        }
+        resourceRegistry.registerResource(partitionId, this::releaseResources);
+    }
+
+    @Override
+    public boolean tryStartNewSegment(TieredStorageSubpartitionId 
subpartitionId, int segmentId) {
+        if (isBroadcastOnly) {
+            return false;
+        }
+        boolean canStartNewSegment =
+                isSubpartitionRegistered(subpartitionId)
+                        && 
(storageMemoryManager.getMaxNonReclaimableBuffers(this)
+                                        - 
storageMemoryManager.numOwnerRequestedBuffer(this))
+                                > numBuffersPerSegment;
+        if (canStartNewSegment) {
+            getSubpartitionProducerAgent(subpartitionId.getSubpartitionId())
+                    .addSegmentBufferContext(segmentId);
+        }
+        return canStartNewSegment;
+    }
+
+    @Override
+    public boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer 
finishedBuffer) {
+        int subpartitionIndex = subpartitionId.getSubpartitionId();
+        if (numSubpartitionEmitBytes[subpartitionIndex] != 0
+                && numSubpartitionEmitBytes[subpartitionIndex] + 
finishedBuffer.readableBytes()
+                        > numBytesPerSegment) {
+            appendEndOfSegmentEvent(subpartitionIndex);
+            numSubpartitionEmitBytes[subpartitionIndex] = 0;
+            return false;
+        }
+        numSubpartitionEmitBytes[subpartitionIndex] += 
finishedBuffer.readableBytes();
+        append(finishedBuffer, subpartitionIndex);
+        return true;
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private NettyServiceProducer createNettyServiceProducer() {
+        return new NettyServiceProducer() {
+            @Override
+            public void connectionEstablished(
+                    TieredStorageSubpartitionId subpartitionId,
+                    NettyConnectionWriter nettyConnectionWriter) {
+                MemoryTierProducerAgent.this.register(subpartitionId, 
nettyConnectionWriter);
+            }
+
+            @Override
+            public void connectionBroken(NettyConnectionId connectionId) {
+                // nothing to do;
+            }
+        };
+    }

Review Comment:
   Instead of creating a new anonymous class, we should make 
`MemoryTierProducerAgent` implement the `NettyServiceProducer` interface.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/local/memory/MemoryTierFactory.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.local.memory;

Review Comment:
   `.local` might not be necessary.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/local/memory/MemoryTierMasterAgent.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.local.memory;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+/** The memory tier implementation for {@link TierMasterAgent}. */
+public class MemoryTierMasterAgent implements TierMasterAgent {

Review Comment:
   We may introduce a reusable `NoopMasterAgent`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/local/memory/MemoryTierMasterAgent.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.local.memory;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+
+/** The memory tier implementation for {@link TierMasterAgent}. */
+public class MemoryTierMasterAgent implements TierMasterAgent {
+    @Override
+    public void addPartition(TieredStoragePartitionId partitionId) {
+        // Nothing to do.

Review Comment:
   Minor: `noop`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/local/memory/MemoryTierProducerAgent.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.local.memory;
+
+import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The memory tier implementation of {@link TierProducerAgent}. */
+public class MemoryTierProducerAgent implements TierProducerAgent {
+
+    private final int numBytesPerSegment;
+
+    private final int numBuffersPerSegment;
+
+    private final TieredStorageMemoryManager storageMemoryManager;
+
+    private final boolean isBroadcastOnly;
+
+    // Record the byte number currently written to each sub partition.
+    private final int[] numSubpartitionEmitBytes;
+
+    /**
+     * Each element of the list is all views of the subpartition corresponding 
to its index, which
+     * are stored in the form of a map that maps consumer id to its 
subpartition view.
+     */
+    private final boolean[] nettyServiceRegistered;
+
+    private final MemoryTierSubpartitionProducerAgent[] 
subpartitionProducerAgents;
+
+    public MemoryTierProducerAgent(
+            TieredStoragePartitionId partitionId,
+            int numSubpartitions,
+            int bufferSize,
+            int numBytesPerSegment,
+            TieredStorageMemoryManager storageMemoryManager,
+            boolean isBroadcastOnly,
+            TieredStorageNettyService nettyService,
+            TieredStorageResourceRegistry resourceRegistry) {
+        checkArgument(
+                numBytesPerSegment >= bufferSize, "One segment contains at 
least one buffer.");
+
+        this.numBytesPerSegment = numBytesPerSegment;
+        this.numBuffersPerSegment = numBytesPerSegment / bufferSize;
+        this.isBroadcastOnly = isBroadcastOnly;
+        this.storageMemoryManager = storageMemoryManager;
+        this.numSubpartitionEmitBytes = new int[numSubpartitions];
+        this.nettyServiceRegistered = new boolean[numSubpartitions];
+        this.subpartitionProducerAgents = new 
MemoryTierSubpartitionProducerAgent[numSubpartitions];
+
+        Arrays.fill(numSubpartitionEmitBytes, 0);
+        nettyService.registerProducer(partitionId, 
createNettyServiceProducer());
+        for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
++subpartitionId) {
+            subpartitionProducerAgents[subpartitionId] =
+                    new MemoryTierSubpartitionProducerAgent(subpartitionId, 
nettyService);
+        }
+        resourceRegistry.registerResource(partitionId, this::releaseResources);
+    }
+
+    @Override
+    public boolean tryStartNewSegment(TieredStorageSubpartitionId 
subpartitionId, int segmentId) {
+        if (isBroadcastOnly) {
+            return false;
+        }
+        boolean canStartNewSegment =
+                isSubpartitionRegistered(subpartitionId)
+                        && 
(storageMemoryManager.getMaxNonReclaimableBuffers(this)
+                                        - 
storageMemoryManager.numOwnerRequestedBuffer(this))
+                                > numBuffersPerSegment;
+        if (canStartNewSegment) {
+            getSubpartitionProducerAgent(subpartitionId.getSubpartitionId())
+                    .addSegmentBufferContext(segmentId);
+        }
+        return canStartNewSegment;
+    }
+
+    @Override
+    public boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer 
finishedBuffer) {
+        int subpartitionIndex = subpartitionId.getSubpartitionId();
+        if (numSubpartitionEmitBytes[subpartitionIndex] != 0
+                && numSubpartitionEmitBytes[subpartitionIndex] + 
finishedBuffer.readableBytes()
+                        > numBytesPerSegment) {
+            appendEndOfSegmentEvent(subpartitionIndex);
+            numSubpartitionEmitBytes[subpartitionIndex] = 0;
+            return false;
+        }
+        numSubpartitionEmitBytes[subpartitionIndex] += 
finishedBuffer.readableBytes();
+        append(finishedBuffer, subpartitionIndex);
+        return true;
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do
+    }
+
+    // ------------------------------------------------------------------------
+    //  Internal Methods
+    // ------------------------------------------------------------------------
+
+    private NettyServiceProducer createNettyServiceProducer() {
+        return new NettyServiceProducer() {
+            @Override
+            public void connectionEstablished(
+                    TieredStorageSubpartitionId subpartitionId,
+                    NettyConnectionWriter nettyConnectionWriter) {
+                MemoryTierProducerAgent.this.register(subpartitionId, 
nettyConnectionWriter);
+            }
+
+            @Override
+            public void connectionBroken(NettyConnectionId connectionId) {
+                // nothing to do;
+            }
+        };
+    }
+
+    private void register(

Review Comment:
   Method name is confusing. We should keep consistent terminology with 
`connectionEstablished`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to