gaoyunhaii commented on a change in pull request #18:
URL: https://github.com/apache/flink-ml/pull/18#discussion_r744108746



##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java
##########
@@ -43,6 +43,10 @@
     private final List<Segment> finishSegments;
 
     private SegmentWriter currentSegment;
+    /**
+     * The segments that are newly added that has not been retrieved by 
getNewlyFinishedSegments().
+     */
+    private final List<Segment> newlyFinishedSegments;

Review comment:
        Do we still need `newlyFinishedSegments` if on initialization we could 
add also include the pending segments into the `finishSegments` of the new 
writers?

##########
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/broadcast/operator/AbstractBroadcastWrapperOperator.java
##########
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.broadcast.operator;
+
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.iteration.datacache.nonkeyed.DataCacheReader;
+import org.apache.flink.iteration.datacache.nonkeyed.DataCacheSnapshot;
+import org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter;
+import org.apache.flink.iteration.datacache.nonkeyed.Segment;
+import org.apache.flink.iteration.operator.OperatorUtils;
+import org.apache.flink.iteration.proxy.state.ProxyStreamOperatorStateContext;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.ml.common.broadcast.BroadcastContext;
+import org.apache.flink.ml.common.broadcast.BroadcastStreamingRuntimeContext;
+import org.apache.flink.ml.common.broadcast.typeinfo.CacheElement;
+import org.apache.flink.ml.common.broadcast.typeinfo.CacheElementTypeInfo;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.InternalOperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
+import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler;
+import 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+
+/** Base class for the broadcast wrapper operators. */
+public abstract class AbstractBroadcastWrapperOperator<T, S extends 
StreamOperator<T>>
+        implements StreamOperator<T>, 
StreamOperatorStateHandler.CheckpointedStreamOperator {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractBroadcastWrapperOperator.class);
+
+    protected final StreamOperatorParameters<T> parameters;
+
+    protected final StreamConfig streamConfig;
+
+    protected final StreamTask<?, ?> containingTask;
+
+    protected final Output<StreamRecord<T>> output;
+
+    protected final StreamOperatorFactory<T> operatorFactory;
+
+    protected final OperatorMetricGroup metrics;
+
+    protected final S wrappedOperator;
+
+    protected transient StreamOperatorStateHandler stateHandler;
+
+    protected transient InternalTimeServiceManager<?> timeServiceManager;
+
+    protected final MailboxExecutor mailboxExecutor;
+
+    /** variables specific for withBroadcast functionality. */
+    protected final String[] broadcastStreamNames;
+
+    /**
+     * whether each input is blocked. Inputs with broadcast variables can only 
process their input
+     * records after broadcast variables are ready. One input is non-blocked 
if it can consume its
+     * inputs (by caching) when broadcast variables are not ready. Otherwise 
it has to block the
+     * processing and wait until the broadcast variables are ready to be 
accessed.
+     */
+    protected final boolean[] isBlocked;
+
+    /** type information of each input. */
+    protected final TypeInformation<?>[] inTypes;
+
+    /** whether all broadcast variables of this operator are ready. */
+    protected boolean broadcastVariablesReady;
+
+    /** index of this subtask. */
+    protected final transient int indexOfSubtask;
+
+    /** number of the inputs of this operator. */
+    protected final int numInputs;
+
+    /** runtimeContext of the rich function in wrapped operator. */
+    BroadcastStreamingRuntimeContext wrappedOperatorRuntimeContext;
+
+    /**
+     * path of the file used to stored the cached records. It could be local 
file system or remote
+     * file system.
+     */
+    private Path basePath;
+
+    /** file system. */
+    protected FileSystem fileSystem;
+
+    /** DataCacheWriter for each input. */
+    @SuppressWarnings("rawtypes")
+    protected DataCacheWriter[] dataCacheWriters;
+
+    /** segment list for each input. */
+    protected List<Segment>[] segmentLists;
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    AbstractBroadcastWrapperOperator(
+            StreamOperatorParameters<T> parameters,
+            StreamOperatorFactory<T> operatorFactory,
+            String[] broadcastStreamNames,
+            TypeInformation<?>[] inTypes,
+            boolean[] isBlocked) {
+        this.parameters = Objects.requireNonNull(parameters);
+        this.streamConfig = 
Objects.requireNonNull(parameters.getStreamConfig());
+        this.containingTask = 
Objects.requireNonNull(parameters.getContainingTask());
+        this.output = Objects.requireNonNull(parameters.getOutput());
+        this.operatorFactory = Objects.requireNonNull(operatorFactory);
+        this.metrics = 
createOperatorMetricGroup(containingTask.getEnvironment(), streamConfig);
+        this.wrappedOperator =
+                (S)
+                        StreamOperatorFactoryUtil.<T, S>createOperator(
+                                        operatorFactory,
+                                        (StreamTask) containingTask,
+                                        streamConfig,
+                                        output,
+                                        
parameters.getOperatorEventDispatcher())
+                                .f0;
+
+        boolean hasRichFunction =
+                wrappedOperator instanceof AbstractUdfStreamOperator
+                        && ((AbstractUdfStreamOperator) 
wrappedOperator).getUserFunction()
+                                instanceof RichFunction;
+
+        if (hasRichFunction) {
+            wrappedOperatorRuntimeContext =
+                    new BroadcastStreamingRuntimeContext(
+                            containingTask.getEnvironment(),
+                            
containingTask.getEnvironment().getAccumulatorRegistry().getUserMap(),
+                            wrappedOperator.getMetricGroup(),
+                            wrappedOperator.getOperatorID(),
+                            ((AbstractUdfStreamOperator) wrappedOperator)
+                                    .getProcessingTimeService(),
+                            null,
+                            
containingTask.getEnvironment().getExternalResourceInfoProvider());
+
+            ((RichFunction) ((AbstractUdfStreamOperator) 
wrappedOperator).getUserFunction())
+                    .setRuntimeContext(wrappedOperatorRuntimeContext);
+        } else {
+            throw new RuntimeException(
+                    "The operator is not a instance of "
+                            + AbstractUdfStreamOperator.class.getSimpleName()
+                            + " that contains a "
+                            + RichFunction.class.getSimpleName());
+        }
+
+        this.mailboxExecutor =
+                
containingTask.getMailboxExecutorFactory().createExecutor(TaskMailbox.MIN_PRIORITY);
+        // variables specific for withBroadcast functionality.
+        this.broadcastStreamNames = broadcastStreamNames;
+        this.isBlocked = isBlocked;
+        this.inTypes = inTypes;
+        this.broadcastVariablesReady = false;
+        this.indexOfSubtask = containingTask.getIndexInSubtaskGroup();
+        this.numInputs = inTypes.length;
+
+        // puts in mailboxExecutor
+        for (String name : broadcastStreamNames) {
+            BroadcastContext.putMailBoxExecutor(name + "-" + indexOfSubtask, 
mailboxExecutor);
+        }
+
+        basePath =
+                OperatorUtils.getDataCachePath(
+                        
containingTask.getEnvironment().getTaskManagerInfo().getConfiguration(),
+                        containingTask
+                                .getEnvironment()
+                                .getIOManager()
+                                .getSpillingDirectoriesPaths());
+        try {
+            fileSystem = basePath.getFileSystem();
+            dataCacheWriters = new DataCacheWriter[numInputs];
+            for (int i = 0; i < numInputs; i++) {
+                dataCacheWriters[i] =
+                        new DataCacheWriter(
+                                new CacheElementTypeInfo<>(inTypes[i])
+                                        
.createSerializer(containingTask.getExecutionConfig()),
+                                fileSystem,
+                                () ->

Review comment:
       Also see `OperatorUtils#createDataCacheFileGenerator`

##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java
##########
@@ -43,6 +43,10 @@
     private final List<Segment> finishSegments;
 
     private SegmentWriter currentSegment;
+    /**

Review comment:
       Add empty line.

##########
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastStreamingRuntimeContext.java
##########
@@ -0,0 +1,68 @@
+package org.apache.flink.ml.common.broadcast;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An subclass of {@link StreamingRuntimeContext} that provides accessibility 
of broadcast
+ * variables.
+ */
+public class BroadcastStreamingRuntimeContext extends StreamingRuntimeContext {
+
+    Map<String, List<?>> broadcastVariables = new HashMap<>();
+
+    public BroadcastStreamingRuntimeContext(
+            Environment env,
+            Map<String, Accumulator<?, ?>> accumulators,
+            OperatorMetricGroup operatorMetricGroup,
+            OperatorID operatorID,
+            ProcessingTimeService processingTimeService,
+            @Nullable KeyedStateStore keyedStateStore,
+            ExternalResourceInfoProvider externalResourceInfoProvider) {
+        super(
+                env,
+                accumulators,
+                operatorMetricGroup,
+                operatorID,
+                processingTimeService,
+                keyedStateStore,
+                externalResourceInfoProvider);
+    }
+
+    @Override
+    public boolean hasBroadcastVariable(String name) {
+        return broadcastVariables.containsKey(name);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <RT> List<RT> getBroadcastVariable(String name) {
+        return (List<RT>) broadcastVariables.get(name);

Review comment:
       Do we need to consider the case that users call `getBroadcastVariable` 
before received all the elements of broadcastVariables ? 

##########
File path: flink-ml-lib/src/test/resources/log4j2-test.properties
##########
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO

Review comment:
       Should be `OFF`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to