mynameborat commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r435042186
##########
File path:
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -714,15 +729,15 @@ private void startSideInputs() {
LOG.info("SideInput Restore started");
// initialize the sideInputStorageManagers
- getSideInputStorageManagers().forEach(sideInputStorageManager ->
sideInputStorageManager.init());
+ getSideInputHandlers().forEach(handler -> handler.init());
Review comment:
minor: prefer to use method reference.
if you choose to, please do in the rest of places as well.
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input
SSPs within a task.
+ */
+public class TaskSideInputHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+ private final StorageManagerUtil storageManagerUtil = new
StorageManagerUtil();
+
+ private final TaskName taskName;
+ private final TaskSideInputStorageManager taskSideInputStorageManager;
+ private final Map<SystemStreamPartition, Set<String>> sspToStores;
+ private final Map<String, SideInputsProcessor> storeToProcessor;
+ private final SystemAdmins systemAdmins;
+ private final StreamMetadataCache streamMetadataCache;
+ private final Map<SystemStreamPartition, String> lastProcessedOffsets = new
ConcurrentHashMap<>();
+
+ private Map<SystemStreamPartition, String> startingOffsets;
+
+ public TaskSideInputHandler(
+ TaskName taskName,
+ TaskMode taskMode,
+ File storeBaseDir,
+ Map<String, StorageEngine> storeToStorageEngines,
+ Map<String, Set<SystemStreamPartition>> storeToSSPs,
+ Map<String, SideInputsProcessor> storeToProcessor,
+ SystemAdmins systemAdmins,
+ StreamMetadataCache streamMetadataCache,
+ Clock clock) {
+ this.taskName = taskName;
+ this.systemAdmins = systemAdmins;
+ this.streamMetadataCache = streamMetadataCache;
+ this.storeToProcessor = storeToProcessor;
+
+ this.sspToStores = storeToSSPs.entrySet().stream()
+ .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+ .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp,
storeAndSSPs.getKey())))
+ .collect(Collectors.groupingBy(
+ Map.Entry::getKey,
+ Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
Review comment:
Prefer to keep the old way of initializing this
```
this.sspsToStores = new HashMap<>();
storesToSSPs.forEach((store, ssps) -> {
for (SystemStreamPartition ssp: ssps) {
sspsToStores.computeIfAbsent(ssp, key -> new HashSet<>());
sspsToStores.computeIfPresent(ssp, (key, value) -> {
value.add(store);
return value;
});
}
});
```
as its much more readable and simpler.
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input
SSPs within a task.
+ */
+public class TaskSideInputHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+ private final StorageManagerUtil storageManagerUtil = new
StorageManagerUtil();
+
+ private final TaskName taskName;
+ private final TaskSideInputStorageManager taskSideInputStorageManager;
+ private final Map<SystemStreamPartition, Set<String>> sspToStores;
+ private final Map<String, SideInputsProcessor> storeToProcessor;
+ private final SystemAdmins systemAdmins;
+ private final StreamMetadataCache streamMetadataCache;
+ private final Map<SystemStreamPartition, String> lastProcessedOffsets = new
ConcurrentHashMap<>();
+
+ private Map<SystemStreamPartition, String> startingOffsets;
+
+ public TaskSideInputHandler(
+ TaskName taskName,
+ TaskMode taskMode,
+ File storeBaseDir,
+ Map<String, StorageEngine> storeToStorageEngines,
+ Map<String, Set<SystemStreamPartition>> storeToSSPs,
+ Map<String, SideInputsProcessor> storeToProcessor,
+ SystemAdmins systemAdmins,
+ StreamMetadataCache streamMetadataCache,
+ Clock clock) {
+ this.taskName = taskName;
+ this.systemAdmins = systemAdmins;
+ this.streamMetadataCache = streamMetadataCache;
+ this.storeToProcessor = storeToProcessor;
+
+ this.sspToStores = storeToSSPs.entrySet().stream()
+ .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+ .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp,
storeAndSSPs.getKey())))
+ .collect(Collectors.groupingBy(
+ Map.Entry::getKey,
+ Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+ this.taskSideInputStorageManager = new
TaskSideInputStorageManager(taskName,
+ taskMode,
+ storeBaseDir,
+ storeToStorageEngines,
+ storeToSSPs,
+ clock);
+
+ validateProcessorConfiguration();
+ }
+
+ /**
+ * The {@link TaskName} associated with this {@link TaskSideInputHandler}
+ *
+ * @return the task name for this handler
+ */
+ public TaskName getTaskName() {
+ return this.taskName;
+ }
+
+ /**
+ * Initializes the underlying {@link TaskSideInputStorageManager} and
determines starting offsets for each SSP.
+ */
+ public void init() {
+ this.taskSideInputStorageManager.init();
+
+ Map<SystemStreamPartition, String> fileOffsets =
this.taskSideInputStorageManager.getFileOffsets();
+ LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
+
+ this.lastProcessedOffsets.putAll(fileOffsets);
+ LOG.info("Last processed offsets for the task {}: {}", taskName,
lastProcessedOffsets);
+
+ this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+ LOG.info("Starting offsets for the task {}: {}", taskName,
startingOffsets);
+ }
+
+ /**
+ * Processes the incoming side input message envelope and updates the last
processed offset for its SSP.
+ * Synchronized inorder to be exclusive with flush().
+ *
+ * @param envelope incoming envelope to be processed
+ */
+ public synchronized void process(IncomingMessageEnvelope envelope) {
Review comment:
Can we synchronize on a specific block or just make lastProcessedOffsets
a thread safe data structure?
If no to both, can you add a documentation to clarify that?
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
##########
@@ -258,9 +142,10 @@ private void initializeStoreDirectories() {
/**
* Writes the offset files for all side input stores one by one. There is
one offset file per store.
* Its contents are a JSON encoded mapping from each side input SSP to its
last processed offset, and a checksum.
+ *
+ * @param lastProcessedOffsets The offset per SSP to write
*/
- @VisibleForTesting
- void writeOffsetFiles() {
+ public void writeOffsetFiles(Map<SystemStreamPartition, String>
lastProcessedOffsets) {
Review comment:
nit: now that you are changing the signature anyways, can we rename this
to `writeFileOffsets` so that is consistent with `getFileOffsets`.
what do you think?
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input
SSPs within a task.
+ */
+public class TaskSideInputHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+ private final StorageManagerUtil storageManagerUtil = new
StorageManagerUtil();
+
+ private final TaskName taskName;
+ private final TaskSideInputStorageManager taskSideInputStorageManager;
+ private final Map<SystemStreamPartition, Set<String>> sspToStores;
+ private final Map<String, SideInputsProcessor> storeToProcessor;
+ private final SystemAdmins systemAdmins;
+ private final StreamMetadataCache streamMetadataCache;
+ private final Map<SystemStreamPartition, String> lastProcessedOffsets = new
ConcurrentHashMap<>();
+
+ private Map<SystemStreamPartition, String> startingOffsets;
+
+ public TaskSideInputHandler(
+ TaskName taskName,
+ TaskMode taskMode,
+ File storeBaseDir,
+ Map<String, StorageEngine> storeToStorageEngines,
+ Map<String, Set<SystemStreamPartition>> storeToSSPs,
+ Map<String, SideInputsProcessor> storeToProcessor,
+ SystemAdmins systemAdmins,
+ StreamMetadataCache streamMetadataCache,
+ Clock clock) {
+ this.taskName = taskName;
+ this.systemAdmins = systemAdmins;
+ this.streamMetadataCache = streamMetadataCache;
+ this.storeToProcessor = storeToProcessor;
+
+ this.sspToStores = storeToSSPs.entrySet().stream()
+ .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+ .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp,
storeAndSSPs.getKey())))
+ .collect(Collectors.groupingBy(
+ Map.Entry::getKey,
+ Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+ this.taskSideInputStorageManager = new
TaskSideInputStorageManager(taskName,
+ taskMode,
+ storeBaseDir,
+ storeToStorageEngines,
+ storeToSSPs,
+ clock);
+
+ validateProcessorConfiguration();
+ }
+
+ /**
+ * The {@link TaskName} associated with this {@link TaskSideInputHandler}
+ *
+ * @return the task name for this handler
+ */
+ public TaskName getTaskName() {
+ return this.taskName;
+ }
+
+ /**
+ * Initializes the underlying {@link TaskSideInputStorageManager} and
determines starting offsets for each SSP.
+ */
+ public void init() {
+ this.taskSideInputStorageManager.init();
+
+ Map<SystemStreamPartition, String> fileOffsets =
this.taskSideInputStorageManager.getFileOffsets();
+ LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
+
+ this.lastProcessedOffsets.putAll(fileOffsets);
+ LOG.info("Last processed offsets for the task {}: {}", taskName,
lastProcessedOffsets);
+
+ this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+ LOG.info("Starting offsets for the task {}: {}", taskName,
startingOffsets);
+ }
+
+ /**
+ * Processes the incoming side input message envelope and updates the last
processed offset for its SSP.
+ * Synchronized inorder to be exclusive with flush().
+ *
+ * @param envelope incoming envelope to be processed
+ */
+ public synchronized void process(IncomingMessageEnvelope envelope) {
+ SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
+ String envelopeOffset = envelope.getOffset();
+
+ for (String store: this.sspToStores.get(envelopeSSP)) {
+ SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
+ KeyValueStore keyValueStore = (KeyValueStore)
this.taskSideInputStorageManager.getStore(store);
+ Collection<Entry<?, ?>> entriesToBeWritten =
storeProcessor.process(envelope, keyValueStore);
+
+ // TODO: SAMZA-2255: optimize writes to side input stores
+ for (Entry entry : entriesToBeWritten) {
+ // If the key is null we ignore, if the value is null, we issue a
delete, else we issue a put
+ if (entry.getKey() != null) {
+ if (entry.getValue() != null) {
+ keyValueStore.put(entry.getKey(), entry.getValue());
+ } else {
+ keyValueStore.delete(entry.getKey());
+ }
+ }
+ }
+ }
+
+ this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
+ }
+
+ /**
+ * Flushes the underlying {@link TaskSideInputStorageManager}
+ * Synchronized inorder to be exclusive with process()
+ */
+ public synchronized void flush() {
+ this.taskSideInputStorageManager.flush(this.lastProcessedOffsets);
+ }
+
+ /**
+ * Gets the starting offset for the given side input {@link
SystemStreamPartition}.
+ *
+ * Note: The method doesn't respect {@link
org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT} and
+ * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET}
configurations. It will use the local offset
+ * file if it is valid, else it will fall back to oldest offset in the
stream.
+ *
+ * @param ssp side input system stream partition to get the starting offset
for
+ * @return the starting offset
+ */
+ public String getStartingOffset(SystemStreamPartition ssp) {
+ return this.startingOffsets.get(ssp);
+ }
+
+ /**
+ * Gets the last processed offset for the given side input {@link
SystemStreamPartition}.
+ *
+ * @param ssp side input system stream partition to get the last processed
offset for
+ * @return the last processed offset
+ */
+ public String getLastProcessedOffset(SystemStreamPartition ssp) {
+ return this.lastProcessedOffsets.get(ssp);
+ }
+
+ /**
+ * Stops the underlying storage manager at the last processed offsets.
+ */
+ public void stop() {
Review comment:
I might have forgotten the need for synchronization between flush &
process. If it is related to `lastProcessedOffsets` then we may have to do it
in other places too.
Or, if it is related to access to `taskSideInputStorageManager` because of
lack of thread safety of that class.
Either way, `stop` looks like it requires synchronization.
##########
File path:
samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
Review comment:
replace with actual imports.
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
##########
@@ -279,13 +164,12 @@ void writeOffsetFiles() {
}
/**
- * Gets the side input SSP offsets for all stores from their local offset
files.
+ * Gets the side input SSP offsets for all stores from their local offset
files. This method should be executed only
+ * once at class initialization.
Review comment:
what happens if it is executed after initialization?
If you really expect this to be only executed once before init, should we
have a precondition check and throw exception to explicitly warn about the
consequence of this being invoked multiple times?
##########
File path:
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -329,18 +345,14 @@ public ContainerStorageManager(
/**
* Creates SystemConsumer objects for store restoration, creating one
consumer per system.
*/
- private static Map<String, SystemConsumer> createConsumers(Map<String,
Set<SystemStream>> systemStreams,
+ private static Map<String, SystemConsumer> createConsumers(Set<String>
systems,
Review comment:
can we give a meaningful name instead of systems? I see the call site
passes `containerSideInputSystems` and `containerChangelogSystems`. so maybe
keep `storeSystems` or anything that conveys more information than systems :)
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input
SSPs within a task.
+ */
+public class TaskSideInputHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+ private final StorageManagerUtil storageManagerUtil = new
StorageManagerUtil();
+
+ private final TaskName taskName;
+ private final TaskSideInputStorageManager taskSideInputStorageManager;
+ private final Map<SystemStreamPartition, Set<String>> sspToStores;
+ private final Map<String, SideInputsProcessor> storeToProcessor;
+ private final SystemAdmins systemAdmins;
+ private final StreamMetadataCache streamMetadataCache;
+ private final Map<SystemStreamPartition, String> lastProcessedOffsets = new
ConcurrentHashMap<>();
+
+ private Map<SystemStreamPartition, String> startingOffsets;
+
+ public TaskSideInputHandler(
+ TaskName taskName,
+ TaskMode taskMode,
+ File storeBaseDir,
+ Map<String, StorageEngine> storeToStorageEngines,
+ Map<String, Set<SystemStreamPartition>> storeToSSPs,
+ Map<String, SideInputsProcessor> storeToProcessor,
+ SystemAdmins systemAdmins,
+ StreamMetadataCache streamMetadataCache,
+ Clock clock) {
Review comment:
You have single line parameter pattern here but modified from the single
line pattern of constructor to few lines of constructor in the storage manager.
I'd prefer this to also follow the suit of `TaskSideInputStorageManager`
since that is concise and having a breakdown of parameter per line doesn't add
too much value in constructor.
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input
SSPs within a task.
+ */
+public class TaskSideInputHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+ private final StorageManagerUtil storageManagerUtil = new
StorageManagerUtil();
+
+ private final TaskName taskName;
+ private final TaskSideInputStorageManager taskSideInputStorageManager;
+ private final Map<SystemStreamPartition, Set<String>> sspToStores;
+ private final Map<String, SideInputsProcessor> storeToProcessor;
+ private final SystemAdmins systemAdmins;
+ private final StreamMetadataCache streamMetadataCache;
+ private final Map<SystemStreamPartition, String> lastProcessedOffsets = new
ConcurrentHashMap<>();
+
+ private Map<SystemStreamPartition, String> startingOffsets;
+
+ public TaskSideInputHandler(
+ TaskName taskName,
+ TaskMode taskMode,
+ File storeBaseDir,
+ Map<String, StorageEngine> storeToStorageEngines,
+ Map<String, Set<SystemStreamPartition>> storeToSSPs,
+ Map<String, SideInputsProcessor> storeToProcessor,
+ SystemAdmins systemAdmins,
+ StreamMetadataCache streamMetadataCache,
+ Clock clock) {
+ this.taskName = taskName;
+ this.systemAdmins = systemAdmins;
+ this.streamMetadataCache = streamMetadataCache;
+ this.storeToProcessor = storeToProcessor;
+
+ this.sspToStores = storeToSSPs.entrySet().stream()
+ .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+ .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp,
storeAndSSPs.getKey())))
+ .collect(Collectors.groupingBy(
+ Map.Entry::getKey,
+ Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+ this.taskSideInputStorageManager = new
TaskSideInputStorageManager(taskName,
+ taskMode,
+ storeBaseDir,
+ storeToStorageEngines,
+ storeToSSPs,
+ clock);
+
+ validateProcessorConfiguration();
Review comment:
suggest moving this to the top of the constructor with taking in the
stores as parameter to fail fast and prevent initializations.
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input
SSPs within a task.
+ */
+public class TaskSideInputHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+ private final StorageManagerUtil storageManagerUtil = new
StorageManagerUtil();
+
+ private final TaskName taskName;
Review comment:
nit: why is separate from the other private fields. If the intention is
the separate private final fields that are initialized here, then should
`lastProcessedOffset` moved here?
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input
SSPs within a task.
+ */
+public class TaskSideInputHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+ private final StorageManagerUtil storageManagerUtil = new
StorageManagerUtil();
+
+ private final TaskName taskName;
+ private final TaskSideInputStorageManager taskSideInputStorageManager;
+ private final Map<SystemStreamPartition, Set<String>> sspToStores;
+ private final Map<String, SideInputsProcessor> storeToProcessor;
+ private final SystemAdmins systemAdmins;
+ private final StreamMetadataCache streamMetadataCache;
+ private final Map<SystemStreamPartition, String> lastProcessedOffsets = new
ConcurrentHashMap<>();
+
+ private Map<SystemStreamPartition, String> startingOffsets;
+
+ public TaskSideInputHandler(
+ TaskName taskName,
+ TaskMode taskMode,
+ File storeBaseDir,
+ Map<String, StorageEngine> storeToStorageEngines,
+ Map<String, Set<SystemStreamPartition>> storeToSSPs,
+ Map<String, SideInputsProcessor> storeToProcessor,
+ SystemAdmins systemAdmins,
+ StreamMetadataCache streamMetadataCache,
+ Clock clock) {
+ this.taskName = taskName;
+ this.systemAdmins = systemAdmins;
+ this.streamMetadataCache = streamMetadataCache;
+ this.storeToProcessor = storeToProcessor;
+
+ this.sspToStores = storeToSSPs.entrySet().stream()
+ .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+ .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp,
storeAndSSPs.getKey())))
+ .collect(Collectors.groupingBy(
+ Map.Entry::getKey,
+ Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+ this.taskSideInputStorageManager = new
TaskSideInputStorageManager(taskName,
+ taskMode,
+ storeBaseDir,
+ storeToStorageEngines,
+ storeToSSPs,
+ clock);
Review comment:
why are we constructing the storage manager here? why not have the CSM
pass the constructed storage manager to the handler?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]