This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 86ab945 SAMZA-2382: User specified side input processor should take
precedence over the identity processor (#1218)
86ab945 is described below
commit 86ab945cea711718ba84d2c2b726b78b548b92c7
Author: rmatharu <[email protected]>
AuthorDate: Tue Nov 12 20:15:59 2019 -0800
SAMZA-2382: User specified side input processor should take precedence over
the identity processor (#1218)
User specified side input processor should take precedence over the
identity processor for standby-tasks.
---
.../samza/storage/ContainerStorageManager.java | 77 ++++++++++++----------
1 file changed, 41 insertions(+), 36 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 7b97e6f..afd3e69 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -526,51 +526,56 @@ public class ContainerStorageManager {
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics) {
Map<TaskName, Map<String, SideInputsProcessor>>
sideInputStoresToProcessors = new HashMap<>();
- getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) ->
{
+ containerModel.getTasks().forEach((taskName, taskModel) -> {
sideInputStoresToProcessors.put(taskName, new HashMap<>());
+ TaskMode taskMode = taskModel.getTaskMode();
+
for (String storeName : sideInputSystemStreams.keySet()) {
+
+ SideInputsProcessor sideInputsProcessor;
Optional<String> sideInputsProcessorSerializedInstance =
config.getSideInputsProcessorSerializedInstance(storeName);
+
if (sideInputsProcessorSerializedInstance.isPresent()) {
- sideInputStoresToProcessors.get(taskName)
- .put(storeName, SerdeUtils.deserialize("Side Inputs Processor",
- sideInputsProcessorSerializedInstance.get()));
- } else {
- String sideInputsProcessorFactoryClassName =
config.getSideInputsProcessorFactory(storeName)
- .orElseThrow(() -> new SamzaException(
- String.format("Could not find sideInputs processor factory
for store: %s", storeName)));
+
+ sideInputsProcessor = SerdeUtils.deserialize("Side Inputs
Processor", sideInputsProcessorSerializedInstance.get());
+ LOG.info("Using serialized side-inputs-processor for store: {},
task: {}", storeName, taskName);
+
+ } else if
(config.getSideInputsProcessorFactory(storeName).isPresent()) {
+ String sideInputsProcessorFactoryClassName =
config.getSideInputsProcessorFactory(storeName).get();
SideInputsProcessorFactory sideInputsProcessorFactory =
ReflectionUtil.getObj(sideInputsProcessorFactoryClassName,
SideInputsProcessorFactory.class);
- SideInputsProcessor sideInputsProcessor =
- sideInputsProcessorFactory.getSideInputsProcessor(config,
taskInstanceMetrics.get(taskName).registry());
- sideInputStoresToProcessors.get(taskName).put(storeName,
sideInputsProcessor);
- }
- }
- });
+ sideInputsProcessor =
sideInputsProcessorFactory.getSideInputsProcessor(config,
taskInstanceMetrics.get(taskName).registry());
+ LOG.info("Using side-inputs-processor from factory: {} for store:
{}, task: {}", config.getSideInputsProcessorFactory(storeName).get(),
storeName, taskName);
- // creating identity sideInputProcessor for stores of standbyTasks
- getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel)
-> {
- sideInputStoresToProcessors.put(taskName, new HashMap<>());
- for (String storeName : sideInputSystemStreams.keySet()) {
-
- // have to use the right serde because the sideInput stores are
created
- Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
- .orElseThrow(() -> new SamzaException("Could not find storage
key serde for store: " + storeName)));
- Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
- .orElseThrow(() -> new SamzaException("Could not find storage
msg serde for store: " + storeName)));
- sideInputStoresToProcessors.get(taskName).put(storeName, new
SideInputsProcessor() {
- @Override
- public Collection<Entry<?, ?>> process(IncomingMessageEnvelope
message, KeyValueStore store) {
- // Ignore message if the key is null
- if (message.getKey() == null) {
- return ImmutableList.of();
- } else {
- // Skip serde if the message is null
- return ImmutableList.of(new
Entry<>(keySerde.fromBytes((byte[]) message.getKey()),
- message.getMessage() == null ? null :
msgSerde.fromBytes((byte[]) message.getMessage())));
+ } else {
+ // if this is a active-task with a side-input store but no
sideinput-processor-factory defined in config, we rely on upstream validations
to fail the deploy
+
+ // if this is a standby-task and the store is a non-side-input
changelog store
+ // we creating identity sideInputProcessor for stores of
standbyTasks
+ // have to use the right serde because the sideInput stores are
created
+
+ Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
+ .orElseThrow(() -> new SamzaException("Could not find storage
key serde for store: " + storeName)));
+ Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
+ .orElseThrow(() -> new SamzaException("Could not find storage
msg serde for store: " + storeName)));
+ sideInputsProcessor = new SideInputsProcessor() {
+ @Override
+ public Collection<Entry<?, ?>> process(IncomingMessageEnvelope
message, KeyValueStore store) {
+ // Ignore message if the key is null
+ if (message.getKey() == null) {
+ return ImmutableList.of();
+ } else {
+ // Skip serde if the message is null
+ return ImmutableList.of(new
Entry<>(keySerde.fromBytes((byte[]) message.getKey()),
+ message.getMessage() == null ? null :
msgSerde.fromBytes((byte[]) message.getMessage())));
+ }
}
- }
- });
+ };
+ LOG.info("Using identity side-inputs-processor for store: {},
task: {}", storeName, taskName);
+ }
+
+ sideInputStoresToProcessors.get(taskName).put(storeName,
sideInputsProcessor);
}
});