seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1037946626
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -148,7 +149,8 @@ public DoFnOp(
JobInfo jobInfo,
Map<String, TupleTag<?>> idToTupleTagMap,
DoFnSchemaInformation doFnSchemaInformation,
- Map<?, PCollectionView<?>> sideInputMapping) {
+ Map<?, PCollectionView<?>> sideInputMapping,
+ Map<String, String> userStateIds) {
Review Comment:
As this is no longer needed, I reverted this diff but definitely agree.
Thank you
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -140,9 +141,14 @@ public SamzaPipelineResult run(Pipeline pipeline) {
LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);
final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+ final Map<String, String> multiParDoStateIdMap = new HashMap<>();
Review Comment:
I totally agree - this approach seems much better. Made the change, thank
you!
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -99,13 +96,16 @@
// the stores include both beamStore for system states as well as stores for
user state
private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+ // Map of non-system stateIds to unique storeId
+ private final Map<String, String> stateIdMap;
private final K key;
private final byte[] keyBytes;
private final int batchGetSize;
private final String stageId;
private SamzaStoreStateInternals(
Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
Review Comment:
Ack removed the changes in this class
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
TaskContext context,
SamzaPipelineOptions pipelineOptions,
ExecutableStage executableStage) {
-
- Set<String> stateIds =
+ // TODO: handle same stateIds in multiple ParDos for portable mode
+ Map<String, String> stateIds =
executableStage.getUserStates().stream()
- .map(UserStateReference::localName)
- .collect(Collectors.toSet());
+ .collect(
+ Collectors.toMap(UserStateReference::localName,
UserStateReference::localName));
return createStateInternalsFactory(id, keyCoder, context, pipelineOptions,
stateIds);
}
@SuppressWarnings("unchecked")
- private static <K> Factory<K> createStateInternalsFactory(
+ static <K> Factory<K> createStateInternalsFactory(
String id,
@Nullable Coder<K> keyCoder,
TaskContext context,
SamzaPipelineOptions pipelineOptions,
- Collection<String> stateIds) {
+ Map<String, String> stateIdMap) {
final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new
HashMap<>();
stores.put(BEAM_STORE, getBeamStore(context));
-
final Coder<K> stateKeyCoder;
if (keyCoder != null) {
- stateIds.forEach(
- stateId ->
- stores.put(
- stateId, (KeyValueStore<ByteArray, StateValue<?>>)
context.getStore(stateId)));
+ stateIdMap
+ .values()
+ .forEach(
+ storeId ->
+ stores.put(
Review Comment:
Ack removed the changes in this class
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos()
{
public static class Factory<K> implements StateInternalsFactory<K> {
private final String stageId;
private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+ private final Map<String, String> stateIdMap;
private final Coder<K> keyCoder;
private final int batchGetSize;
public Factory(
String stageId,
Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
+ Map<String, String> stateIdMap,
Coder<K> keyCoder,
int batchGetSize) {
this.stageId = stageId;
this.stores = stores;
+ this.stateIdMap = stateIdMap;
Review Comment:
Done.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -323,11 +319,11 @@ protected AbstractSamzaState(
this.coder = coder;
this.namespace = namespace;
this.addressId = address.getId();
- this.isBeamStore = !stores.containsKey(address.getId());
+ this.isBeamStore = !stores.containsKey(stateIdMap.get(address.getId()));
Review Comment:
Done.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java:
##########
@@ -35,12 +35,17 @@ public class ConfigContext {
private final Map<PValue, String> idMap;
private AppliedPTransform<?, ?, ?> currentTransform;
private final SamzaPipelineOptions options;
- private final Set<String> stateIds;
+ private final Map<String, String> usedStateIdMap;
+ private final Map<String, String> stateIdsToRewrite;
- public ConfigContext(Map<PValue, String> idMap, SamzaPipelineOptions
options) {
+ public ConfigContext(
Review Comment:
Thank you for the suggestion here, made the change.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java:
##########
@@ -377,25 +392,21 @@ public Map<String, String> createConfig(
if (signature.usesState()) {
// set up user state configs
for (DoFnSignature.StateDeclaration state :
signature.stateDeclarations().values()) {
- final String storeId = state.id();
-
- // TODO: remove validation after we support same state id in different
ParDo.
- if (!ctx.addStateId(storeId)) {
- throw new IllegalStateException(
- "Duplicate StateId " + storeId + " found in multiple ParDo.");
- }
-
+ final String userStateId = state.id();
+ final String escapedParDoName =
+
SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName());
+ final String uniqueStoreId = ctx.getUniqueStoreId(userStateId,
escapedParDoName);
Review Comment:
Only need config creation part for translation so removed this part.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+public class StoreIdUtils {
+
+ /**
+ * Join stateId and escaped PTransform name, used for RocksDB storeId of
stateIds with multiple
+ * ParDos.
+ */
+ public static String toMultiParDoStoreId(String stateId, String
escapedPTransformName) {
Review Comment:
Done.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -92,14 +90,14 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> create(
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping,
OpEmitter emitter,
- FutureCollector futureCollector) {
+ FutureCollector futureCollector,
+ Map<String, String> userStateIds) {
Review Comment:
Sounds good
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -99,13 +96,16 @@
// the stores include both beamStore for system states as well as stores for
user state
private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+ // Map of non-system stateIds to unique storeId
+ private final Map<String, String> stateIdMap;
Review Comment:
Sounds good.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos()
{
public static class Factory<K> implements StateInternalsFactory<K> {
private final String stageId;
private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+ private final Map<String, String> stateIdMap;
Review Comment:
Done.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -303,7 +298,8 @@ public StateInternals stateInternalsForKey(@Nullable K key)
{
throw new RuntimeException("Cannot encode key for state store", e);
}
- return new SamzaStoreStateInternals<>(stores, key, baos.toByteArray(),
stageId, batchGetSize);
+ return new SamzaStoreStateInternals<>(
Review Comment:
Done.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
TaskContext context,
SamzaPipelineOptions pipelineOptions,
ExecutableStage executableStage) {
-
- Set<String> stateIds =
+ // TODO: handle same stateIds in multiple ParDos for portable mode
+ Map<String, String> stateIds =
executableStage.getUserStates().stream()
- .map(UserStateReference::localName)
- .collect(Collectors.toSet());
+ .collect(
+ Collectors.toMap(UserStateReference::localName,
UserStateReference::localName));
return createStateInternalsFactory(id, keyCoder, context, pipelineOptions,
stateIds);
}
@SuppressWarnings("unchecked")
- private static <K> Factory<K> createStateInternalsFactory(
+ static <K> Factory<K> createStateInternalsFactory(
String id,
@Nullable Coder<K> keyCoder,
TaskContext context,
SamzaPipelineOptions pipelineOptions,
- Collection<String> stateIds) {
+ Map<String, String> stateIdMap) {
final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new
HashMap<>();
stores.put(BEAM_STORE, getBeamStore(context));
-
final Coder<K> stateKeyCoder;
if (keyCoder != null) {
- stateIds.forEach(
- stateId ->
- stores.put(
- stateId, (KeyValueStore<ByteArray, StateValue<?>>)
context.getStore(stateId)));
+ stateIdMap
Review Comment:
Got it - thank you.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
TaskContext context,
SamzaPipelineOptions pipelineOptions,
ExecutableStage executableStage) {
-
- Set<String> stateIds =
+ // TODO: handle same stateIds in multiple ParDos for portable mode
+ Map<String, String> stateIds =
executableStage.getUserStates().stream()
- .map(UserStateReference::localName)
- .collect(Collectors.toSet());
+ .collect(
+ Collectors.toMap(UserStateReference::localName,
UserStateReference::localName));
return createStateInternalsFactory(id, keyCoder, context, pipelineOptions,
stateIds);
}
@SuppressWarnings("unchecked")
- private static <K> Factory<K> createStateInternalsFactory(
+ static <K> Factory<K> createStateInternalsFactory(
String id,
@Nullable Coder<K> keyCoder,
TaskContext context,
SamzaPipelineOptions pipelineOptions,
- Collection<String> stateIds) {
+ Map<String, String> stateIdMap) {
final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new
HashMap<>();
stores.put(BEAM_STORE, getBeamStore(context));
-
final Coder<K> stateKeyCoder;
if (keyCoder != null) {
- stateIds.forEach(
- stateId ->
- stores.put(
- stateId, (KeyValueStore<ByteArray, StateValue<?>>)
context.getStore(stateId)));
+ stateIdMap
+ .values()
+ .forEach(
+ storeId ->
+ stores.put(
+ storeId,
+ (KeyValueStore<ByteArray, StateValue<?>>)
context.getStore(storeId)));
stateKeyCoder = keyCoder;
} else {
stateKeyCoder = (Coder<K>) VoidCoder.of();
}
- return new Factory<>(Objects.toString(id), stores, stateKeyCoder,
batchGetSize);
+ return new Factory<>(Objects.toString(id), stores, stateIdMap,
stateKeyCoder, batchGetSize);
Review Comment:
Done.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -99,13 +96,16 @@
// the stores include both beamStore for system states as well as stores for
user state
private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+ // Map of non-system stateIds to unique storeId
+ private final Map<String, String> stateIdMap;
private final K key;
private final byte[] keyBytes;
private final int batchGetSize;
private final String stageId;
private SamzaStoreStateInternals(
Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
+ Map<String, String> stateIds,
Review Comment:
Ack removed the changes in this class
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos()
{
public static class Factory<K> implements StateInternalsFactory<K> {
private final String stageId;
private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+ private final Map<String, String> stateIdMap;
private final Coder<K> keyCoder;
private final int batchGetSize;
public Factory(
String stageId,
Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
+ Map<String, String> stateIdMap,
Review Comment:
Done.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java:
##########
@@ -83,6 +84,10 @@ public void putAll(Map<String, String> properties) {
config.putAll(properties);
}
+ public void remove(String name) {
Review Comment:
Makes sense - done.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -323,11 +319,11 @@ protected AbstractSamzaState(
this.coder = coder;
this.namespace = namespace;
this.addressId = address.getId();
- this.isBeamStore = !stores.containsKey(address.getId());
+ this.isBeamStore = !stores.containsKey(stateIdMap.get(address.getId()));
this.store =
isBeamStore
? (KeyValueStore) stores.get(BEAM_STORE)
- : (KeyValueStore) stores.get(address.getId());
+ : (KeyValueStore) stores.get(stateIdMap.get(address.getId()));
Review Comment:
Done.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java:
##########
@@ -64,8 +69,39 @@ public SamzaPipelineOptions getPipelineOptions() {
return this.options;
}
- public boolean addStateId(String stateId) {
- return stateIds.add(stateId);
+ /** Helper to keep track of used stateIds and return unique store id. */
+ public String getUniqueStoreId(String stateId, String parDoName) {
+ // Update a map of used state id with parDo name.
+ if (!usedStateIdMap.containsKey(stateId)) {
+ usedStateIdMap.put(stateId, parDoName);
+ return stateId;
+ } else {
+ // Same state id identified for the first time
+ if (!stateIdsToRewrite.containsKey(stateId)) {
+ final String prevParDoName = usedStateIdMap.get(stateId);
+ final String prevMultiParDoStateId =
+ StoreIdUtils.toMultiParDoStoreId(stateId, prevParDoName);
+ usedStateIdMap.put(prevMultiParDoStateId, prevParDoName);
+ // Store the stateId with previous parDo name which will be used for
config rewriting
+ stateIdsToRewrite.put(stateId, prevParDoName);
Review Comment:
Changed the logic to prescanning.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -105,6 +108,46 @@ public static void createConfig(
pipeline.traverseTopologically(visitor);
}
+ /**
+ * Rewrite user store configs if there exists same state ids used in
multiple ParDos. For each
+ * entry of a stateId to escaped PTransform name of first occurrence in
topological traversal,
+ * rewrite RocksDB configs with the new mapping enforced from stateId to
storeId.
+ * (eg) @StateId("foo") used in two ParDos fn, fn2: .apply("First Stateful
ParDo with same
+ * stateId", ParDo.of(fn)) .apply("Second Stateful ParDo with same stateId",
ParDo.of(fn2)) Map =
+ * ("foo", "First_Stateful_ParDo_with_same_stateId") storeId =
+ * "foo-First_Stateful_ParDo_with_same_stateId"
+ */
+ public static void rewriteConfigWithMultiParDoStateId(
Review Comment:
Thank you :-)
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -83,6 +84,7 @@ public class TranslationContext {
private final Map<PValue, String> idMap;
private final Map<String, MessageStream> registeredInputStreams = new
HashMap<>();
private final Map<String, Table> registeredTables = new HashMap<>();
+ private final Set<String> multiParDoStateIds;
Review Comment:
I really like "nonUniqueStateIds".. made the change throughout the diff.
Thank you!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]