[ 
https://issues.apache.org/jira/browse/BEAM-6650?focusedWorklogId=198739&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-198739
 ]

ASF GitHub Bot logged work on BEAM-6650:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Feb/19 15:31
            Start Date: 14/Feb/19 15:31
    Worklog Time Spent: 10m 
      Work Description: aljoscha commented on pull request #7810: [BEAM-6650] 
Add bundle test with checkpointing for keyed processing
URL: https://github.com/apache/beam/pull/7810#discussion_r256883235
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkKeyGroupStateInternals.java
 ##########
 @@ -106,337 +80,150 @@ public K getKey() {
   @Override
   public <T extends State> T state(
       final StateNamespace namespace, StateTag<T> address, final 
StateContext<?> context) {
-
-    return address.bind(
-        new StateTag.StateBinder() {
-
-          @Override
-          public <T2> ValueState<T2> bindValue(StateTag<ValueState<T2>> 
address, Coder<T2> coder) {
-            throw new UnsupportedOperationException(
-                String.format("%s is not supported", 
ValueState.class.getSimpleName()));
-          }
-
-          @Override
-          public <T2> BagState<T2> bindBag(StateTag<BagState<T2>> address, 
Coder<T2> elemCoder) {
-
-            return new FlinkKeyGroupBagState<>(address, namespace, elemCoder);
-          }
-
-          @Override
-          public <T2> SetState<T2> bindSet(StateTag<SetState<T2>> address, 
Coder<T2> elemCoder) {
-            throw new UnsupportedOperationException(
-                String.format("%s is not supported", 
SetState.class.getSimpleName()));
-          }
-
-          @Override
-          public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-              StateTag<MapState<KeyT, ValueT>> spec,
-              Coder<KeyT> mapKeyCoder,
-              Coder<ValueT> mapValueCoder) {
-            throw new UnsupportedOperationException(
-                String.format("%s is not supported", 
MapState.class.getSimpleName()));
-          }
-
-          @Override
-          public <InputT, AccumT, OutputT>
-              CombiningState<InputT, AccumT, OutputT> bindCombiningValue(
-                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder,
-                  Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-            throw new UnsupportedOperationException("bindCombiningValue is not 
supported.");
-          }
-
-          @Override
-          public <InputT, AccumT, OutputT>
-              CombiningState<InputT, AccumT, OutputT> 
bindCombiningValueWithContext(
-                  StateTag<CombiningState<InputT, AccumT, OutputT>> address,
-                  Coder<AccumT> accumCoder,
-                  CombineWithContext.CombineFnWithContext<InputT, AccumT, 
OutputT> combineFn) {
-            throw new UnsupportedOperationException(
-                "bindCombiningValueWithContext is not supported.");
-          }
-
-          @Override
-          public WatermarkHoldState bindWatermark(
-              StateTag<WatermarkHoldState> address, TimestampCombiner 
timestampCombiner) {
-            throw new UnsupportedOperationException(
-                String.format("%s is not supported", 
CombiningState.class.getSimpleName()));
-          }
-        });
-  }
-
-  /**
-   * Reference from {@link Combine.CombineFn}.
-   *
-   * <p>Accumulators are stored in each KeyGroup, call addInput() when a 
element comes, call
-   * extractOutput() to produce the desired value when need to read data.
-   */
-  interface KeyGroupCombiner<InputT, AccumT, OutputT> {
-
-    /**
-     * Returns a new, mutable accumulator value, representing the accumulation 
of zero input values.
-     */
-    AccumT createAccumulator();
-
-    /** Adds the given input value to the given accumulator, returning the new 
accumulator value. */
-    AccumT addInput(AccumT accumulator, InputT input);
-
-    /**
-     * Returns the output value that is the result of all accumulators from 
KeyGroups that are
-     * assigned to this operator.
-     */
-    OutputT extractOutput(Iterable<AccumT> accumulators);
-  }
-
-  private abstract class AbstractKeyGroupState<InputT, AccumT, OutputT> {
-
-    private String stateName;
-    private String namespace;
-    private Coder<AccumT> coder;
-    private KeyGroupCombiner<InputT, AccumT, OutputT> keyGroupCombiner;
-
-    AbstractKeyGroupState(
-        String stateName,
-        String namespace,
-        Coder<AccumT> coder,
-        KeyGroupCombiner<InputT, AccumT, OutputT> keyGroupCombiner) {
-      this.stateName = stateName;
-      this.namespace = namespace;
-      this.coder = coder;
-      this.keyGroupCombiner = keyGroupCombiner;
-    }
-
-    /** Choose keyGroup of input and addInput to accumulator. */
-    void addInput(InputT input) {
-      int keyGroupIdx = keyedStateBackend.getCurrentKeyGroupIndex();
-      int localIdx = getIndexForKeyGroup(keyGroupIdx);
-      Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = 
stateTables[localIdx];
-      Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
-      if (tuple2 == null) {
-        tuple2 = new Tuple2<>();
-        tuple2.f0 = coder;
-        tuple2.f1 = new HashMap<>();
-        stateTable.put(stateName, tuple2);
-      }
-      Map<String, AccumT> map = (Map<String, AccumT>) tuple2.f1;
-      AccumT accumulator = map.get(namespace);
-      if (accumulator == null) {
-        accumulator = keyGroupCombiner.createAccumulator();
-      }
-      accumulator = keyGroupCombiner.addInput(accumulator, input);
-      map.put(namespace, accumulator);
-    }
-
-    /** Get all accumulators and invoke extractOutput(). */
-    OutputT extractOutput() {
-      List<AccumT> accumulators = new ArrayList<>(stateTables.length);
-      for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : 
stateTables) {
-        Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
-        if (tuple2 != null) {
-          AccumT accumulator = (AccumT) tuple2.f1.get(namespace);
-          if (accumulator != null) {
-            accumulators.add(accumulator);
-          }
-        }
-      }
-      return keyGroupCombiner.extractOutput(accumulators);
-    }
-
-    /** Find the first accumulator and return immediately. */
-    boolean isEmptyInternal() {
-      for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : 
stateTables) {
-        Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
-        if (tuple2 != null) {
-          AccumT accumulator = (AccumT) tuple2.f1.get(namespace);
-          if (accumulator != null) {
-            return false;
-          }
-        }
-      }
-      return true;
-    }
-
-    /** Clear accumulators and clean empty map. */
-    void clearInternal() {
-      for (Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable : 
stateTables) {
-        Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
-        if (tuple2 != null) {
-          tuple2.f1.remove(namespace);
-          if (tuple2.f1.isEmpty()) {
-            stateTable.remove(stateName);
-          }
-        }
-      }
-    }
-  }
-
-  private int getIndexForKeyGroup(int keyGroupIdx) {
-    checkArgument(
-        localKeyGroupRange.contains(keyGroupIdx),
-        "Key Group " + keyGroupIdx + " does not belong to the local range.");
-    return keyGroupIdx - this.localKeyGroupRangeStartIdx;
-  }
-
-  private static class KeyGroupBagCombiner<T> implements KeyGroupCombiner<T, 
List<T>, Iterable<T>> {
-
-    @Override
-    public List<T> createAccumulator() {
-      return new ArrayList<>();
-    }
-
-    @Override
-    public List<T> addInput(List<T> accumulator, T input) {
-      accumulator.add(input);
-      return accumulator;
-    }
-
-    @Override
-    public Iterable<T> extractOutput(Iterable<List<T>> accumulators) {
-      List<T> result = new ArrayList<>();
-      // maybe can return an unmodifiable view.
-      for (List<T> list : accumulators) {
-        result.addAll(list);
-      }
-      return result;
-    }
+    return address
 
 Review comment:
   thanks!
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 198739)
    Time Spent: 3h  (was: 2h 50m)

> FlinkRunner fails to checkpoint elements emitted during finishBundle
> --------------------------------------------------------------------
>
>                 Key: BEAM-6650
>                 URL: https://issues.apache.org/jira/browse/BEAM-6650
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>             Fix For: 2.11.0
>
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> Elements emitted during the finalizeBundle call in snapshopState are lost 
> after the pipeline is restored. This only happens when the operator is keyed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to