KurtYoung commented on a change in pull request #8086: 
[FLINK-12062][table-runtime-blink] Introduce bundle operator to streaming table 
runtime
URL: https://github.com/apache/flink/pull/8086#discussion_r271575050
 
 

 ##########
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/bundle/AbstractMapBundleOperator.java
 ##########
 @@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.bundle;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.table.runtime.bundle.trigger.BundleTrigger;
+import org.apache.flink.table.runtime.bundle.trigger.BundleTriggerCallback;
+import org.apache.flink.table.runtime.context.ExecutionContextImpl;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link AbstractMapBundleOperator} simply used a java Map to store the 
input elements
+ * in key-value form. The map key is typically the same with the state key, so 
we can do some
+ * optimizations before accessing states, like pre aggregate values for each 
key. And we will
+ * only need to access state every key we have, but not every element we 
processed.
+ *
+ * <p>NOTES: if all elements we processed have different keys, such operator 
will only increase
+ * memory footprint, and will not have any performance improvement.
+ *
+ * @param <K>   The type of the key in the bundle map
+ * @param <V>   The type of the value in the bundle map
+ * @param <IN>  Input type for the operator.
+ * @param <OUT> Output type for the operator.
+ */
+public abstract class AbstractMapBundleOperator<K, V, IN, OUT>
+               extends AbstractStreamOperator<OUT>
+               implements OneInputStreamOperator<IN, OUT>, 
BundleTriggerCallback {
+
+       private static final long serialVersionUID = 5081841938324118594L;
+
+       /** The map in heap to store elements. */
+       private final transient Map<K, V> bundle;
+
+       /** The trigger that determines how many elements should be put into a 
bundle. */
+       private final BundleTrigger<IN> bundleTrigger;
+
+       /** The function used to process when receiving element. */
+       private final MapBundleFunction<K, V, IN, OUT> function;
+
+       /** Output for stream records. */
+       private transient Collector<OUT> collector;
+
+       private transient int numOfElements = 0;
+
+       private transient volatile boolean isInFinishingBundle = false;
+       private transient Object checkpointingLock;
+
+       protected AbstractMapBundleOperator(
+                       MapBundleFunction<K, V, IN, OUT> function,
+                       BundleTrigger<IN> bundleTrigger) {
+               chainingStrategy = ChainingStrategy.ALWAYS;
+               this.bundle = new HashMap<>();
+               this.function = checkNotNull(function, "function is null");
+               this.bundleTrigger = checkNotNull(bundleTrigger, "bundleTrigger 
is null");
+       }
+
+       @Override
+       public void setup(
+                       StreamTask<?, ?> containingTask,
+                       StreamConfig config,
+                       Output<StreamRecord<OUT>> output) {
+               super.setup(containingTask, config, output);
+               this.checkpointingLock = 
getContainingTask().getCheckpointLock();
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               function.open(new ExecutionContextImpl(this, 
getRuntimeContext()));
+
+               this.numOfElements = 0;
+               this.collector = new StreamRecordCollector<>(output);
+
+               bundleTrigger.registerCallback(this);
+               // reset trigger
+               bundleTrigger.reset();
+               LOG.info("BundleOperator's trigger info: " + 
bundleTrigger.explain());
+
+               // counter metric to get the size of bundle
+               getRuntimeContext().getMetricGroup().gauge("bundleSize", 
(Gauge<Integer>) () -> numOfElements);
+               getRuntimeContext().getMetricGroup().gauge("bundleRatio", 
(Gauge<Double>) () -> {
+                       int numOfKeys = bundle.size();
+                       if (numOfKeys == 0) {
+                               return 0.0;
+                       } else {
+                               return 1.0 * numOfElements / numOfKeys;
+                       }
+               });
+       }
+
+       @Override
+       public void processElement(StreamRecord<IN> element) throws Exception {
+               while (isInFinishingBundle) {
+                       checkpointingLock.wait();
+               }
+
+               // get the key and value for the map bundle
+               final IN input = element.getValue();
+               final K bundleKey = getKey(input);
+               final V bundleValue = bundle.get(bundleKey);
+
+               // get a new value after adding this element to bundle
+               final V newBundleValue = function.addInput(bundleValue, input);
+
+               // update to map bundle
+               bundle.put(bundleKey, newBundleValue);
+
+               numOfElements++;
+               bundleTrigger.onElement(input);
+       }
+
+       /**
+        * Get the key for current processing element, which will be used as 
the map bundle's key.
+        */
+       protected abstract K getKey(final IN input) throws Exception;
+
+       @Override
+       public void finishBundle() throws Exception {
+               while (isInFinishingBundle) {
+                       checkpointingLock.wait();
+               }
+               isInFinishingBundle = true;
 
 Review comment:
   Yes, i will add more comments to explain why we need this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to