xinyuiscool commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1037689857
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -148,7 +149,8 @@ public DoFnOp(
JobInfo jobInfo,
Map<String, TupleTag<?>> idToTupleTagMap,
DoFnSchemaInformation doFnSchemaInformation,
- Map<?, PCollectionView<?>> sideInputMapping) {
+ Map<?, PCollectionView<?>> sideInputMapping,
+ Map<String, String> userStateIds) {
Review Comment:
stateIdToStoreMapping? A meaningful name will help understand your logic.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -140,9 +141,14 @@ public SamzaPipelineResult run(Pipeline pipeline) {
LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);
final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+ final Map<String, String> multiParDoStateIdMap = new HashMap<>();
Review Comment:
The current impl of invoking the createConfig() to populate this map and
then have another rewrite() to remove some configs and add new configs is quite
awkward. The more elegant impl should be following the existing practice of
building the viewToId map, do a pre-scan of the pipeline to create the
immutable nonUniqueStateIds set. I expect the code looks like the following:
// DO a pre-scan of the pipeline to build this map, and it stays immutable
afterwards.
final Map<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
// Create the store configs in one shot, no rewriting businsess
SamzaPipelineTranslator.createConfig(
pipeline, options, idMap, nonUniqueStateIds, configBuilder);
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
TaskContext context,
SamzaPipelineOptions pipelineOptions,
ExecutableStage executableStage) {
-
- Set<String> stateIds =
+ // TODO: handle same stateIds in multiple ParDos for portable mode
+ Map<String, String> stateIds =
executableStage.getUserStates().stream()
- .map(UserStateReference::localName)
- .collect(Collectors.toSet());
+ .collect(
+ Collectors.toMap(UserStateReference::localName,
UserStateReference::localName));
return createStateInternalsFactory(id, keyCoder, context, pipelineOptions,
stateIds);
}
@SuppressWarnings("unchecked")
- private static <K> Factory<K> createStateInternalsFactory(
+ static <K> Factory<K> createStateInternalsFactory(
String id,
@Nullable Coder<K> keyCoder,
TaskContext context,
SamzaPipelineOptions pipelineOptions,
- Collection<String> stateIds) {
+ Map<String, String> stateIdMap) {
final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new
HashMap<>();
stores.put(BEAM_STORE, getBeamStore(context));
-
final Coder<K> stateKeyCoder;
if (keyCoder != null) {
- stateIds.forEach(
- stateId ->
- stores.put(
- stateId, (KeyValueStore<ByteArray, StateValue<?>>)
context.getStore(stateId)));
+ stateIdMap
Review Comment:
We don't need this stateIdMap. We should still keep the stores map to be
<stateId -> store>. There will be no duplicate ids within a single DoFn.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -99,13 +96,16 @@
// the stores include both beamStore for system states as well as stores for
user state
private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+ // Map of non-system stateIds to unique storeId
+ private final Map<String, String> stateIdMap;
Review Comment:
then change the name to exactly in your comments: stateIdToStoreMap.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -99,13 +96,16 @@
// the stores include both beamStore for system states as well as stores for
user state
private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+ // Map of non-system stateIds to unique storeId
+ private final Map<String, String> stateIdMap;
private final K key;
private final byte[] keyBytes;
private final int batchGetSize;
private final String stageId;
private SamzaStoreStateInternals(
Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
+ Map<String, String> stateIds,
Review Comment:
We don't need this.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -92,14 +90,14 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> create(
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping,
OpEmitter emitter,
- FutureCollector futureCollector) {
+ FutureCollector futureCollector,
+ Map<String, String> userStateIds) {
Review Comment:
Same above, better naming.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos()
{
public static class Factory<K> implements StateInternalsFactory<K> {
private final String stageId;
private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+ private final Map<String, String> stateIdMap;
Review Comment:
remove.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
TaskContext context,
SamzaPipelineOptions pipelineOptions,
ExecutableStage executableStage) {
-
- Set<String> stateIds =
+ // TODO: handle same stateIds in multiple ParDos for portable mode
+ Map<String, String> stateIds =
executableStage.getUserStates().stream()
- .map(UserStateReference::localName)
- .collect(Collectors.toSet());
+ .collect(
+ Collectors.toMap(UserStateReference::localName,
UserStateReference::localName));
return createStateInternalsFactory(id, keyCoder, context, pipelineOptions,
stateIds);
}
@SuppressWarnings("unchecked")
- private static <K> Factory<K> createStateInternalsFactory(
+ static <K> Factory<K> createStateInternalsFactory(
String id,
@Nullable Coder<K> keyCoder,
TaskContext context,
SamzaPipelineOptions pipelineOptions,
- Collection<String> stateIds) {
+ Map<String, String> stateIdMap) {
final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new
HashMap<>();
stores.put(BEAM_STORE, getBeamStore(context));
-
final Coder<K> stateKeyCoder;
if (keyCoder != null) {
- stateIds.forEach(
- stateId ->
- stores.put(
- stateId, (KeyValueStore<ByteArray, StateValue<?>>)
context.getStore(stateId)));
+ stateIdMap
+ .values()
+ .forEach(
+ storeId ->
+ stores.put(
Review Comment:
wrong. Keep the stores map being the same {stateId -> store}.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -99,13 +96,16 @@
// the stores include both beamStore for system states as well as stores for
user state
private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+ // Map of non-system stateIds to unique storeId
+ private final Map<String, String> stateIdMap;
private final K key;
private final byte[] keyBytes;
private final int batchGetSize;
private final String stageId;
private SamzaStoreStateInternals(
Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
Review Comment:
stores should still be the map from stateId to the actual store. No need to
add extra indirect logic here.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos()
{
public static class Factory<K> implements StateInternalsFactory<K> {
private final String stageId;
private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+ private final Map<String, String> stateIdMap;
private final Coder<K> keyCoder;
private final int batchGetSize;
public Factory(
String stageId,
Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
+ Map<String, String> stateIdMap,
Review Comment:
remove
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -303,7 +298,8 @@ public StateInternals stateInternalsForKey(@Nullable K key)
{
throw new RuntimeException("Cannot encode key for state store", e);
}
- return new SamzaStoreStateInternals<>(stores, key, baos.toByteArray(),
stageId, batchGetSize);
+ return new SamzaStoreStateInternals<>(
Review Comment:
reverse the cahnge.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
TaskContext context,
SamzaPipelineOptions pipelineOptions,
ExecutableStage executableStage) {
-
- Set<String> stateIds =
+ // TODO: handle same stateIds in multiple ParDos for portable mode
+ Map<String, String> stateIds =
executableStage.getUserStates().stream()
- .map(UserStateReference::localName)
- .collect(Collectors.toSet());
+ .collect(
+ Collectors.toMap(UserStateReference::localName,
UserStateReference::localName));
return createStateInternalsFactory(id, keyCoder, context, pipelineOptions,
stateIds);
}
@SuppressWarnings("unchecked")
- private static <K> Factory<K> createStateInternalsFactory(
+ static <K> Factory<K> createStateInternalsFactory(
String id,
@Nullable Coder<K> keyCoder,
TaskContext context,
SamzaPipelineOptions pipelineOptions,
- Collection<String> stateIds) {
+ Map<String, String> stateIdMap) {
final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new
HashMap<>();
stores.put(BEAM_STORE, getBeamStore(context));
-
final Coder<K> stateKeyCoder;
if (keyCoder != null) {
- stateIds.forEach(
- stateId ->
- stores.put(
- stateId, (KeyValueStore<ByteArray, StateValue<?>>)
context.getStore(stateId)));
+ stateIdMap
+ .values()
+ .forEach(
+ storeId ->
+ stores.put(
+ storeId,
+ (KeyValueStore<ByteArray, StateValue<?>>)
context.getStore(storeId)));
stateKeyCoder = keyCoder;
} else {
stateKeyCoder = (Coder<K>) VoidCoder.of();
}
- return new Factory<>(Objects.toString(id), stores, stateKeyCoder,
batchGetSize);
+ return new Factory<>(Objects.toString(id), stores, stateIdMap,
stateKeyCoder, batchGetSize);
Review Comment:
get rid of stateIdMap.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -323,11 +319,11 @@ protected AbstractSamzaState(
this.coder = coder;
this.namespace = namespace;
this.addressId = address.getId();
- this.isBeamStore = !stores.containsKey(address.getId());
+ this.isBeamStore = !stores.containsKey(stateIdMap.get(address.getId()));
Review Comment:
reverse the change.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos()
{
public static class Factory<K> implements StateInternalsFactory<K> {
private final String stageId;
private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+ private final Map<String, String> stateIdMap;
private final Coder<K> keyCoder;
private final int batchGetSize;
public Factory(
String stageId,
Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
+ Map<String, String> stateIdMap,
Coder<K> keyCoder,
int batchGetSize) {
this.stageId = stageId;
this.stores = stores;
+ this.stateIdMap = stateIdMap;
Review Comment:
remove
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java:
##########
@@ -35,12 +35,17 @@ public class ConfigContext {
private final Map<PValue, String> idMap;
private AppliedPTransform<?, ?, ?> currentTransform;
private final SamzaPipelineOptions options;
- private final Set<String> stateIds;
+ private final Map<String, String> usedStateIdMap;
+ private final Map<String, String> stateIdsToRewrite;
- public ConfigContext(Map<PValue, String> idMap, SamzaPipelineOptions
options) {
+ public ConfigContext(
Review Comment:
Remove all the changes in this class. Do a pre-scan instead.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -323,11 +319,11 @@ protected AbstractSamzaState(
this.coder = coder;
this.namespace = namespace;
this.addressId = address.getId();
- this.isBeamStore = !stores.containsKey(address.getId());
+ this.isBeamStore = !stores.containsKey(stateIdMap.get(address.getId()));
this.store =
isBeamStore
? (KeyValueStore) stores.get(BEAM_STORE)
- : (KeyValueStore) stores.get(address.getId());
+ : (KeyValueStore) stores.get(stateIdMap.get(address.getId()));
Review Comment:
reverse the change.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java:
##########
@@ -64,8 +69,39 @@ public SamzaPipelineOptions getPipelineOptions() {
return this.options;
}
- public boolean addStateId(String stateId) {
- return stateIds.add(stateId);
+ /** Helper to keep track of used stateIds and return unique store id. */
+ public String getUniqueStoreId(String stateId, String parDoName) {
+ // Update a map of used state id with parDo name.
+ if (!usedStateIdMap.containsKey(stateId)) {
+ usedStateIdMap.put(stateId, parDoName);
+ return stateId;
+ } else {
+ // Same state id identified for the first time
+ if (!stateIdsToRewrite.containsKey(stateId)) {
+ final String prevParDoName = usedStateIdMap.get(stateId);
+ final String prevMultiParDoStateId =
+ StoreIdUtils.toMultiParDoStoreId(stateId, prevParDoName);
+ usedStateIdMap.put(prevMultiParDoStateId, prevParDoName);
+ // Store the stateId with previous parDo name which will be used for
config rewriting
+ stateIdsToRewrite.put(stateId, prevParDoName);
Review Comment:
Please, do it in one-shot. Rewriting just means the code is not good enough.
And none of the normal devs will understand what's going on in this 50 lines of
code.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -105,6 +108,46 @@ public static void createConfig(
pipeline.traverseTopologically(visitor);
}
+ /**
+ * Rewrite user store configs if there exists same state ids used in
multiple ParDos. For each
+ * entry of a stateId to escaped PTransform name of first occurrence in
topological traversal,
+ * rewrite RocksDB configs with the new mapping enforced from stateId to
storeId.
+ * (eg) @StateId("foo") used in two ParDos fn, fn2: .apply("First Stateful
ParDo with same
+ * stateId", ParDo.of(fn)) .apply("Second Stateful ParDo with same stateId",
ParDo.of(fn2)) Map =
+ * ("foo", "First_Stateful_ParDo_with_same_stateId") storeId =
+ * "foo-First_Stateful_ParDo_with_same_stateId"
+ */
+ public static void rewriteConfigWithMultiParDoStateId(
Review Comment:
get rid of this!
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java:
##########
@@ -83,6 +84,10 @@ public void putAll(Map<String, String> properties) {
config.putAll(properties);
}
+ public void remove(String name) {
Review Comment:
get rid of this. The whole config generation should be immutable.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java:
##########
@@ -377,25 +392,21 @@ public Map<String, String> createConfig(
if (signature.usesState()) {
// set up user state configs
for (DoFnSignature.StateDeclaration state :
signature.stateDeclarations().values()) {
- final String storeId = state.id();
-
- // TODO: remove validation after we support same state id in different
ParDo.
- if (!ctx.addStateId(storeId)) {
- throw new IllegalStateException(
- "Duplicate StateId " + storeId + " found in multiple ParDo.");
- }
-
+ final String userStateId = state.id();
+ final String escapedParDoName =
+
SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName());
+ final String uniqueStoreId = ctx.getUniqueStoreId(userStateId,
escapedParDoName);
Review Comment:
You already have userStateIdToStoreIdMap, why not just go a
get(userStateId)? Seems this is pretty redundent.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -83,6 +84,7 @@ public class TranslationContext {
private final Map<PValue, String> idMap;
private final Map<String, MessageStream> registeredInputStreams = new
HashMap<>();
private final Map<String, Table> registeredTables = new HashMap<>();
+ private final Set<String> multiParDoStateIds;
Review Comment:
nonUniqueStateIds? The current name doesn't mean too much.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+public class StoreIdUtils {
+
+ /**
+ * Join stateId and escaped PTransform name, used for RocksDB storeId of
stateIds with multiple
+ * ParDos.
+ */
+ public static String toMultiParDoStoreId(String stateId, String
escapedPTransformName) {
Review Comment:
toUniqueStoreId()? Current name is not well understood to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]