This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9346522 SAMZA-2530: Split out processing logic from
TaskSideInputStorageManager (#1367)
9346522 is described below
commit 9346522e4af3259d3e85e317839ff96e278409f9
Author: bkonold <[email protected]>
AuthorDate: Tue Jun 9 15:46:07 2020 -0700
SAMZA-2530: Split out processing logic from TaskSideInputStorageManager
(#1367)
---
.../apache/samza/storage/TaskSideInputHandler.java | 273 +++++++++++++++++++++
.../samza/storage/TaskSideInputStorageManager.java | 233 ++----------------
.../samza/storage/ContainerStorageManager.java | 150 +++++------
.../samza/storage/TestTaskSideInputHandler.java | 202 +++++++++++++++
.../storage/TestTaskSideInputStorageManager.java | 145 ++---------
5 files changed, 596 insertions(+), 407 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
new file mode 100644
index 0000000..56aeb85
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input
SSPs within a task.
+ */
+public class TaskSideInputHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+ private final StorageManagerUtil storageManagerUtil = new
StorageManagerUtil();
+ private final Map<SystemStreamPartition, String> lastProcessedOffsets = new
ConcurrentHashMap<>();
+
+ private final TaskName taskName;
+ private final TaskSideInputStorageManager taskSideInputStorageManager;
+ private final Map<SystemStreamPartition, Set<String>> sspToStores;
+ private final Map<String, SideInputsProcessor> storeToProcessor;
+ private final SystemAdmins systemAdmins;
+ private final StreamMetadataCache streamMetadataCache;
+
+ private Map<SystemStreamPartition, String> startingOffsets;
+
+ public TaskSideInputHandler(TaskName taskName, TaskMode taskMode, File
storeBaseDir,
+ Map<String, StorageEngine> storeToStorageEngines, Map<String,
Set<SystemStreamPartition>> storeToSSPs,
+ Map<String, SideInputsProcessor> storeToProcessor, SystemAdmins
systemAdmins,
+ StreamMetadataCache streamMetadataCache, Clock clock) {
+ validateProcessorConfiguration(storeToSSPs.keySet(), storeToProcessor);
+
+ this.taskName = taskName;
+ this.systemAdmins = systemAdmins;
+ this.streamMetadataCache = streamMetadataCache;
+ this.storeToProcessor = storeToProcessor;
+
+ this.sspToStores = new HashMap<>();
+ storeToSSPs.forEach((store, ssps) -> {
+ for (SystemStreamPartition ssp: ssps) {
+ this.sspToStores.computeIfAbsent(ssp, key -> new HashSet<>());
+ this.sspToStores.computeIfPresent(ssp, (key, value) -> {
+ value.add(store);
+ return value;
+ });
+ }
+ });
+
+ this.taskSideInputStorageManager = new
TaskSideInputStorageManager(taskName,
+ taskMode,
+ storeBaseDir,
+ storeToStorageEngines,
+ storeToSSPs,
+ clock);
+ }
+
+ /**
+ * The {@link TaskName} associated with this {@link TaskSideInputHandler}
+ *
+ * @return the task name for this handler
+ */
+ public TaskName getTaskName() {
+ return this.taskName;
+ }
+
+ /**
+ * Initializes the underlying {@link TaskSideInputStorageManager} and
determines starting offsets for each SSP.
+ */
+ public void init() {
+ this.taskSideInputStorageManager.init();
+
+ Map<SystemStreamPartition, String> fileOffsets =
this.taskSideInputStorageManager.getFileOffsets();
+ LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
+
+ this.lastProcessedOffsets.putAll(fileOffsets);
+ LOG.info("Last processed offsets for the task {}: {}", taskName,
lastProcessedOffsets);
+
+ this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+ LOG.info("Starting offsets for the task {}: {}", taskName,
startingOffsets);
+ }
+
+ /**
+ * Processes the incoming side input message envelope and updates the last
processed offset for its SSP.
+ * Synchronized inorder to be exclusive with flush().
+ *
+ * @param envelope incoming envelope to be processed
+ */
+ public synchronized void process(IncomingMessageEnvelope envelope) {
+ SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
+ String envelopeOffset = envelope.getOffset();
+
+ for (String store: this.sspToStores.get(envelopeSSP)) {
+ SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
+ KeyValueStore keyValueStore = (KeyValueStore)
this.taskSideInputStorageManager.getStore(store);
+ Collection<Entry<?, ?>> entriesToBeWritten =
storeProcessor.process(envelope, keyValueStore);
+
+ // TODO: SAMZA-2255: optimize writes to side input stores
+ for (Entry entry : entriesToBeWritten) {
+ // If the key is null we ignore, if the value is null, we issue a
delete, else we issue a put
+ if (entry.getKey() != null) {
+ if (entry.getValue() != null) {
+ keyValueStore.put(entry.getKey(), entry.getValue());
+ } else {
+ keyValueStore.delete(entry.getKey());
+ }
+ }
+ }
+ }
+
+ this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
+ }
+
+ /**
+ * Flushes the underlying {@link TaskSideInputStorageManager}
+ * Synchronized inorder to be exclusive with process()
+ */
+ public synchronized void flush() {
+ this.taskSideInputStorageManager.flush(this.lastProcessedOffsets);
+ }
+
+ /**
+ * Gets the starting offset for the given side input {@link
SystemStreamPartition}.
+ *
+ * Note: The method doesn't respect {@link
org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT} and
+ * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET}
configurations. It will use the local offset
+ * file if it is valid, else it will fall back to oldest offset in the
stream.
+ *
+ * @param ssp side input system stream partition to get the starting offset
for
+ * @return the starting offset
+ */
+ public String getStartingOffset(SystemStreamPartition ssp) {
+ return this.startingOffsets.get(ssp);
+ }
+
+ /**
+ * Gets the last processed offset for the given side input {@link
SystemStreamPartition}.
+ *
+ * @param ssp side input system stream partition to get the last processed
offset for
+ * @return the last processed offset
+ */
+ public String getLastProcessedOffset(SystemStreamPartition ssp) {
+ return this.lastProcessedOffsets.get(ssp);
+ }
+
+ /**
+ * Stops the underlying storage manager at the last processed offsets. Any
pending and upcoming invocations
+ * of {@link #process} and {@link #flush} are assumed to have completed or
ceased prior to calling this method.
+ */
+ public void stop() {
+ this.taskSideInputStorageManager.stop(this.lastProcessedOffsets);
+ }
+
+ /**
+ * Gets the starting offsets for the {@link SystemStreamPartition}s
belonging to all the side input stores.
+ * If the local file offset is available and is greater than the oldest
available offset from source, uses it,
+ * else falls back to oldest offset in the source.
+ *
+ * @param fileOffsets offsets from the local offset file
+ * @param oldestOffsets oldest offsets from the source
+ * @return a {@link Map} of {@link SystemStreamPartition} to offset
+ */
+ @VisibleForTesting
+ Map<SystemStreamPartition, String> getStartingOffsets(
+ Map<SystemStreamPartition, String> fileOffsets,
Map<SystemStreamPartition, String> oldestOffsets) {
+ Map<SystemStreamPartition, String> startingOffsets = new HashMap<>();
+
+ this.sspToStores.keySet().forEach(ssp -> {
+ String fileOffset = fileOffsets.get(ssp);
+ String oldestOffset = oldestOffsets.get(ssp);
+
+ startingOffsets.put(ssp,
+ this.storageManagerUtil.getStartingOffset(
+ ssp, this.systemAdmins.getSystemAdmin(ssp.getSystem()),
fileOffset, oldestOffset));
+ });
+
+ return startingOffsets;
+ }
+
+ /**
+ * Gets the oldest offset for the {@link SystemStreamPartition}s associated
with all the store side inputs.
+ * 1. Groups the list of the SSPs based on system stream
+ * 2. Fetches the {@link SystemStreamMetadata} from {@link
StreamMetadataCache}
+ * 3. Fetches the partition metadata for each system stream and fetch the
corresponding partition metadata
+ * and populates the oldest offset for SSPs belonging to the system
stream.
+ *
+ * @return a {@link Map} of {@link SystemStreamPartition} to their oldest
offset. If partitionMetadata could not be
+ * obtained for any {@link SystemStreamPartition} the offset for it is
populated as null.
+ */
+ @VisibleForTesting
+ Map<SystemStreamPartition, String> getOldestOffsets() {
+ Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
+
+ // Step 1
+ Map<SystemStream, List<SystemStreamPartition>> systemStreamToSsp =
this.sspToStores.keySet().stream()
+
.collect(Collectors.groupingBy(SystemStreamPartition::getSystemStream));
+
+ // Step 2
+ Map<SystemStream, SystemStreamMetadata> metadata =
JavaConverters.mapAsJavaMapConverter(
+ this.streamMetadataCache.getStreamMetadata(
+
JavaConverters.asScalaSetConverter(systemStreamToSsp.keySet()).asScala().toSet(),
false)).asJava();
+
+ // Step 3
+ metadata.forEach((systemStream, systemStreamMetadata) -> {
+
+ // get the partition metadata for each system stream
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>
partitionMetadata =
+ systemStreamMetadata.getSystemStreamPartitionMetadata();
+
+ // For SSPs belonging to the system stream, use the partition metadata
to get the oldest offset
+ // if partitionMetadata was not obtained for any SSP, populate
oldest-offset as null
+ // Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using
lambda will NPE when getOldestOffset() is null
+ for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
+ oldestOffsets.put(ssp,
partitionMetadata.get(ssp.getPartition()).getOldestOffset());
+ }
+ });
+
+ return oldestOffsets;
+ }
+
+ /**
+ * Validates that each store has an associated {@link SideInputsProcessor}
+ */
+ private void validateProcessorConfiguration(Set<String> stores, Map<String,
SideInputsProcessor> storeToProcessor) {
+ stores.forEach(storeName -> {
+ if (!storeToProcessor.containsKey(storeName)) {
+ throw new SamzaException(
+ String.format("Side inputs processor missing for store: %s.",
storeName));
+ }
+ });
+ }
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index 9cd888a..0a4e763 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -20,34 +20,20 @@
package org.apache.samza.storage;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
import java.io.File;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -62,53 +48,28 @@ public class TaskSideInputStorageManager {
private static final long STORE_DELETE_RETENTION_MS =
TimeUnit.DAYS.toMillis(1); // same as changelog delete retention
private final Clock clock;
- private final Map<String, SideInputsProcessor> storeToProcessor;
private final Map<String, StorageEngine> stores;
private final File storeBaseDir;
private final Map<String, Set<SystemStreamPartition>> storeToSSps;
- private final Map<SystemStreamPartition, Set<String>> sspsToStores;
- private final StreamMetadataCache streamMetadataCache;
- private final SystemAdmins systemAdmins;
private final TaskName taskName;
private final TaskMode taskMode;
- private final Map<SystemStreamPartition, String> lastProcessedOffsets = new
ConcurrentHashMap<>();
private final StorageManagerUtil storageManagerUtil = new
StorageManagerUtil();
- private Map<SystemStreamPartition, String> startingOffsets;
+ public TaskSideInputStorageManager(TaskName taskName, TaskMode taskMode,
File storeBaseDir, Map<String, StorageEngine> sideInputStores,
+ Map<String, Set<SystemStreamPartition>> storesToSSPs, Clock clock) {
+ validateStoreConfiguration(sideInputStores);
- public TaskSideInputStorageManager(
- TaskName taskName,
- TaskMode taskMode,
- StreamMetadataCache streamMetadataCache,
- File storeBaseDir,
- Map<String, StorageEngine> sideInputStores,
- Map<String, SideInputsProcessor> storesToProcessor,
- Map<String, Set<SystemStreamPartition>> storesToSSPs,
- SystemAdmins systemAdmins,
- Config config,
- Clock clock) {
this.clock = clock;
this.stores = sideInputStores;
this.storeBaseDir = storeBaseDir;
this.storeToSSps = storesToSSPs;
- this.streamMetadataCache = streamMetadataCache;
- this.systemAdmins = systemAdmins;
this.taskName = taskName;
this.taskMode = taskMode;
- this.storeToProcessor = storesToProcessor;
-
- validateStoreConfiguration();
+ }
- this.sspsToStores = new HashMap<>();
- storesToSSPs.forEach((store, ssps) -> {
- for (SystemStreamPartition ssp: ssps) {
- sspsToStores.computeIfAbsent(ssp, key -> new HashSet<>());
- sspsToStores.computeIfPresent(ssp, (key, value) -> {
- value.add(store);
- return value;
- });
- }
- });
+ // Get the taskName associated with this instance.
+ public TaskName getTaskName() {
+ return this.taskName;
}
/**
@@ -117,38 +78,32 @@ public class TaskSideInputStorageManager {
public void init() {
LOG.info("Initializing side input stores.");
- Map<SystemStreamPartition, String> fileOffsets = getFileOffsets();
- LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
-
- Map<SystemStreamPartition, String> oldestOffsets = getOldestOffsets();
- LOG.info("Oldest offsets for the task {}: {}", taskName, oldestOffsets);
-
- startingOffsets = getStartingOffsets(fileOffsets, oldestOffsets);
- LOG.info("Starting offsets for the task {}: {}", taskName,
startingOffsets);
-
- lastProcessedOffsets.putAll(fileOffsets);
- LOG.info("Last processed offsets for the task {}: {}", taskName,
lastProcessedOffsets);
-
initializeStoreDirectories();
}
/**
* Flushes the contents of the underlying store and writes the offset file
to disk.
* Synchronized inorder to be exclusive with process()
+ *
+ * @param lastProcessedOffsets The last processed offsets for each SSP.
These will be used when writing offsets files
+ * for each store.
*/
- public synchronized void flush() {
+ public void flush(Map<SystemStreamPartition, String> lastProcessedOffsets) {
LOG.info("Flushing the side input stores.");
stores.values().forEach(StorageEngine::flush);
- writeOffsetFiles();
+ writeFileOffsets(lastProcessedOffsets);
}
/**
* Stops the storage engines for all the stores and writes the offset file
to disk.
+ *
+ * @param lastProcessedOffsets The last processed offsets for each SSP.
These will be used when writing offsets files
+ * for each store.
*/
- public void stop() {
+ public void stop(Map<SystemStreamPartition, String> lastProcessedOffsets) {
LOG.info("Stopping the side input stores.");
stores.values().forEach(StorageEngine::stop);
- writeOffsetFiles();
+ writeFileOffsets(lastProcessedOffsets);
}
/**
@@ -162,77 +117,6 @@ public class TaskSideInputStorageManager {
}
/**
- * Gets the starting offset for the given side input {@link
SystemStreamPartition}.
- *
- * Note: The method doesn't respect {@link
org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT} and
- * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET}
configurations. It will use the local offset
- * file if it is valid, else it will fall back to oldest offset in the
stream.
- *
- * @param ssp side input system stream partition to get the starting offset
for
- * @return the starting offset
- */
- public String getStartingOffset(SystemStreamPartition ssp) {
- return startingOffsets.get(ssp);
- }
-
- // Get the taskName associated with this instance.
- public TaskName getTaskName() {
- return this.taskName;
- }
-
- /**
- * Gets the last processed offset for the given side input {@link
SystemStreamPartition}.
- *
- * @param ssp side input system stream partition to get the last processed
offset for
- * @return the last processed offset
- */
- public String getLastProcessedOffset(SystemStreamPartition ssp) {
- return lastProcessedOffsets.get(ssp);
- }
-
- /**
- * For unit testing only
- */
- @VisibleForTesting
- void updateLastProcessedOffset(SystemStreamPartition ssp, String offset) {
- lastProcessedOffsets.put(ssp, offset);
- }
-
- /**
- * Processes the incoming side input message envelope and updates the last
processed offset for its SSP.
- * Synchronized inorder to be exclusive with flush().
- *
- * @param message incoming message to be processed
- */
- public synchronized void process(IncomingMessageEnvelope message) {
- SystemStreamPartition ssp = message.getSystemStreamPartition();
- Set<String> storeNames = sspsToStores.get(ssp);
-
- for (String storeName : storeNames) {
- SideInputsProcessor sideInputsProcessor =
storeToProcessor.get(storeName);
-
- KeyValueStore keyValueStore = (KeyValueStore) stores.get(storeName);
- Collection<Entry<?, ?>> entriesToBeWritten =
sideInputsProcessor.process(message, keyValueStore);
-
- // Iterate over the list to be written.
- // TODO: SAMZA-2255: Optimize value writes in TaskSideInputStorageManager
- for (Entry entry : entriesToBeWritten) {
- // If the key is null we ignore, if the value is null, we issue a
delete, else we issue a put
- if (entry.getKey() != null) {
- if (entry.getValue() != null) {
- keyValueStore.put(entry.getKey(), entry.getValue());
- } else {
- keyValueStore.delete(entry.getKey());
- }
- }
- }
- }
-
- // update the last processed offset
- lastProcessedOffsets.put(ssp, message.getOffset());
- }
-
- /**
* Initializes the store directories for all the stores:
* 1. Cleans up the directories for invalid stores.
* 2. Ensures that the directories exist.
@@ -258,9 +142,10 @@ public class TaskSideInputStorageManager {
/**
* Writes the offset files for all side input stores one by one. There is
one offset file per store.
* Its contents are a JSON encoded mapping from each side input SSP to its
last processed offset, and a checksum.
+ *
+ * @param lastProcessedOffsets The offset per SSP to write
*/
- @VisibleForTesting
- void writeOffsetFiles() {
+ public void writeFileOffsets(Map<SystemStreamPartition, String>
lastProcessedOffsets) {
storeToSSps.entrySet().stream()
.filter(entry -> isPersistedStore(entry.getKey())) // filter out
in-memory side input stores
.forEach((entry) -> {
@@ -283,9 +168,7 @@ public class TaskSideInputStorageManager {
*
* @return a {@link Map} of {@link SystemStreamPartition} to offset in the
offset files.
*/
- @SuppressWarnings("unchecked")
- @VisibleForTesting
- Map<SystemStreamPartition, String> getFileOffsets() {
+ public Map<SystemStreamPartition, String> getFileOffsets() {
LOG.info("Loading initial offsets from the file for side input stores.");
Map<SystemStreamPartition, String> fileOffsets = new HashMap<>();
@@ -313,73 +196,6 @@ public class TaskSideInputStorageManager {
return storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName,
taskName, taskMode);
}
- /**
- * Gets the starting offsets for the {@link SystemStreamPartition}s
belonging to all the side input stores.
- * If the local file offset is available and is greater than the oldest
available offset from source, uses it,
- * else falls back to oldest offset in the source.
- *
- * @param fileOffsets offsets from the local offset file
- * @param oldestOffsets oldest offsets from the source
- * @return a {@link Map} of {@link SystemStreamPartition} to offset
- */
- @VisibleForTesting
- Map<SystemStreamPartition, String> getStartingOffsets(
- Map<SystemStreamPartition, String> fileOffsets,
Map<SystemStreamPartition, String> oldestOffsets) {
- Map<SystemStreamPartition, String> startingOffsets = new HashMap<>();
-
- sspsToStores.keySet().forEach(ssp -> {
- String fileOffset = fileOffsets.get(ssp);
- String oldestOffset = oldestOffsets.get(ssp);
-
- startingOffsets.put(ssp,
- storageManagerUtil.getStartingOffset(
- ssp, systemAdmins.getSystemAdmin(ssp.getSystem()), fileOffset,
oldestOffset));
- });
-
- return startingOffsets;
- }
-
- /**
- * Gets the oldest offset for the {@link SystemStreamPartition}s associated
with all the store side inputs.
- * 1. Groups the list of the SSPs based on system stream
- * 2. Fetches the {@link SystemStreamMetadata} from {@link
StreamMetadataCache}
- * 3. Fetches the partition metadata for each system stream and fetch the
corresponding partition metadata
- * and populates the oldest offset for SSPs belonging to the system
stream.
- *
- * @return a {@link Map} of {@link SystemStreamPartition} to their oldest
offset. If partitionMetadata could not be
- * obtained for any {@link SystemStreamPartition} the offset for it is
populated as null.
- */
- @VisibleForTesting
- Map<SystemStreamPartition, String> getOldestOffsets() {
- Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
-
- // Step 1
- Map<SystemStream, List<SystemStreamPartition>> systemStreamToSsp =
sspsToStores.keySet().stream()
-
.collect(Collectors.groupingBy(SystemStreamPartition::getSystemStream));
-
- // Step 2
- Map<SystemStream, SystemStreamMetadata> metadata =
JavaConverters.mapAsJavaMapConverter(
- streamMetadataCache.getStreamMetadata(
-
JavaConverters.asScalaSetConverter(systemStreamToSsp.keySet()).asScala().toSet(),
false)).asJava();
-
- // Step 3
- metadata.forEach((systemStream, systemStreamMetadata) -> {
-
- // get the partition metadata for each system stream
- Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>
partitionMetadata =
- systemStreamMetadata.getSystemStreamPartitionMetadata();
-
- // For SSPs belonging to the system stream, use the partition metadata
to get the oldest offset
- // if partitionMetadata was not obtained for any SSP, populate
oldest-offset as null
- // Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using
lambda will NPE when getOldestOffset() is null
- for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
- oldestOffsets.put(ssp,
partitionMetadata.get(ssp.getPartition()).getOldestOffset());
- }
- });
-
- return oldestOffsets;
- }
-
private boolean isValidSideInputStore(String storeName, File storeLocation) {
return isPersistedStore(storeName)
&& !storageManagerUtil.isStaleStore(storeLocation,
STORE_DELETE_RETENTION_MS, clock.currentTimeMillis(), true)
@@ -393,13 +209,8 @@ public class TaskSideInputStorageManager {
.orElse(false);
}
- private void validateStoreConfiguration() {
+ private void validateStoreConfiguration(Map<String, StorageEngine> stores) {
stores.forEach((storeName, storageEngine) -> {
- if (!storeToProcessor.containsKey(storeName)) {
- throw new SamzaException(
- String.format("Side inputs processor missing for store: %s.",
storeName));
- }
-
if (storageEngine.getStoreProperties().isLoggedStore()) {
throw new SamzaException(
String.format("Cannot configure both side inputs and a changelog
for store: %s.", storeName));
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 6e59e55..3087eb4 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -149,10 +149,10 @@ public class ContainerStorageManager {
private final int maxChangeLogStreamPartitions; // The partition count of
each changelog-stream topic. This is used for validating changelog streams
before restoring.
/* Sideinput related parameters */
- private final Map<String, Set<SystemStream>> sideInputSystemStreams; // Map
of sideInput system-streams indexed by store name
- private final Map<TaskName, Map<String, Set<SystemStreamPartition>>>
taskSideInputSSPs;
- private final Map<SystemStreamPartition, TaskSideInputStorageManager>
sideInputStorageManagers; // Map of sideInput storageManagers indexed by ssp,
for simpler lookup for process()
- private final Map<String, SystemConsumer> sideInputConsumers; // Mapping
from storeSystemNames to SystemConsumers
+ private final boolean hasSideInputs;
+ // side inputs indexed first by task, then store name
+ private final Map<TaskName, Map<String, Set<SystemStreamPartition>>>
taskSideInputStoreSSPs;
+ private final Map<SystemStreamPartition, TaskSideInputHandler>
sspSideInputHandlers;
private SystemConsumers sideInputSystemConsumers;
private final Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
= new ConcurrentHashMap<>(); // Recorded sspMetadata of the
taskSideInputSSPs recorded at start, used to determine when sideInputs are
caughtup and container init can proceed
@@ -194,12 +194,16 @@ public class ContainerStorageManager {
Clock clock) {
this.checkpointManager = checkpointManager;
this.containerModel = containerModel;
- this.sideInputSystemStreams = new HashMap<>(sideInputSystemStreams);
- this.taskSideInputSSPs = getTaskSideInputSSPs(containerModel,
sideInputSystemStreams);
+ this.taskSideInputStoreSSPs = getTaskSideInputSSPs(containerModel,
sideInputSystemStreams);
+ this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream()
+ .flatMap(m -> m.values().stream())
+ .flatMap(Collection::stream)
+ .findAny()
+ .isPresent();
this.sspMetadataCache = sspMetadataCache;
this.changelogSystemStreams = getChangelogSystemStreams(containerModel,
changelogSystemStreams); // handling standby tasks
- LOG.info("Starting with changelogSystemStreams = {} sideInputSystemStreams
= {}", this.changelogSystemStreams, this.sideInputSystemStreams);
+ LOG.info("Starting with changelogSystemStreams = {} taskSideInputStoreSSPs
= {}", this.changelogSystemStreams, this.taskSideInputStoreSSPs);
this.storageEngineFactories = storageEngineFactories;
this.serdes = serdes;
@@ -238,25 +242,38 @@ public class ContainerStorageManager {
// create taskStores for all tasks in the containerModel and each store in
storageEngineFactories
this.taskStores = createTaskStores(containerModel, jobContext,
containerContext, storageEngineFactories, serdes, taskInstanceMetrics,
taskInstanceCollectors);
+ Set<String> containerChangelogSystems =
this.changelogSystemStreams.values().stream()
+ .map(SystemStream::getSystem)
+ .collect(Collectors.toSet());
+
// create system consumers (1 per store system in changelogSystemStreams),
and index it by storeName
- Map<String, SystemConsumer> storeSystemConsumers =
createConsumers(this.changelogSystemStreams.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
- e -> Collections.singleton(e.getValue()))), systemFactories, config,
this.samzaContainerMetrics.registry());
+ Map<String, SystemConsumer> storeSystemConsumers = createConsumers(
+ containerChangelogSystems, systemFactories, config,
this.samzaContainerMetrics.registry());
this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams,
storeSystemConsumers);
// creating task restore managers
this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock,
this.samzaContainerMetrics);
- // create sideInput storage managers
- sideInputStorageManagers = createSideInputStorageManagers(clock);
-
- // create sideInput consumers indexed by systemName
- this.sideInputConsumers = createConsumers(this.sideInputSystemStreams,
systemFactories, config, this.samzaContainerMetrics.registry());
+ this.sspSideInputHandlers = createSideInputHandlers(clock);
// create SystemConsumers for consuming from taskSideInputSSPs, if
sideInputs are being used
- if (sideInputsPresent()) {
+ if (this.hasSideInputs) {
+ Set<SystemStream> containerSideInputSystemStreams =
this.taskSideInputStoreSSPs.values().stream()
+ .flatMap(map -> map.values().stream())
+ .flatMap(Set::stream)
+ .map(SystemStreamPartition::getSystemStream)
+ .collect(Collectors.toSet());
+
+ Set<String> containerSideInputSystems =
containerSideInputSystemStreams.stream()
+ .map(SystemStream::getSystem)
+ .collect(Collectors.toSet());
- scala.collection.immutable.Map<SystemStream, SystemStreamMetadata>
inputStreamMetadata =
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(
-
this.sideInputSystemStreams.values().stream().flatMap(Set::stream).collect(Collectors.toSet())).toSet(),
false);
+ // create sideInput consumers indexed by systemName
+ // Mapping from storeSystemNames to SystemConsumers
+ Map<String, SystemConsumer> sideInputConsumers =
+ createConsumers(containerSideInputSystems, systemFactories, config,
this.samzaContainerMetrics.registry());
+
+ scala.collection.immutable.Map<SystemStream, SystemStreamMetadata>
inputStreamMetadata =
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(),
false);
SystemConsumersMetrics sideInputSystemConsumersMetrics = new
SystemConsumersMetrics(samzaContainerMetrics.registry(),
SIDEINPUTS_METRICS_PREFIX);
// we use the same registry as samza-container-metrics
@@ -265,7 +282,7 @@ public class ContainerStorageManager {
sideInputSystemConsumersMetrics.registry(), systemAdmins);
sideInputSystemConsumers =
- new SystemConsumers(chooser,
ScalaJavaUtil.toScalaMap(this.sideInputConsumers), systemAdmins, serdeManager,
+ new SystemConsumers(chooser,
ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager,
sideInputSystemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(),
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
TaskConfig.DEFAULT_POLL_INTERVAL_MS,
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
}
@@ -283,9 +300,9 @@ public class ContainerStorageManager {
Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs =
new HashMap<>();
containerModel.getTasks().forEach((taskName, taskModel) -> {
+ taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
sideInputSystemStreams.keySet().forEach(storeName -> {
Set<SystemStreamPartition> taskSideInputs =
taskModel.getSystemStreamPartitions().stream().filter(ssp ->
sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
- taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs);
});
});
@@ -312,12 +329,11 @@ public class ContainerStorageManager {
);
getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel)
-> {
+ this.taskSideInputStoreSSPs.putIfAbsent(taskName, new HashMap<>());
changelogSystemStreams.forEach((storeName, systemStream) -> {
SystemStreamPartition ssp = new
SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
changelogSSPToStore.remove(ssp);
- this.taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
- this.sideInputSystemStreams.put(storeName,
Collections.singleton(ssp.getSystemStream()));
- this.taskSideInputSSPs.get(taskName).put(storeName,
Collections.singleton(ssp));
+ this.taskSideInputStoreSSPs.get(taskName).put(storeName,
Collections.singleton(ssp));
});
});
@@ -329,28 +345,21 @@ public class ContainerStorageManager {
/**
* Creates SystemConsumer objects for store restoration, creating one
consumer per system.
*/
- private static Map<String, SystemConsumer> createConsumers(Map<String,
Set<SystemStream>> systemStreams,
+ private static Map<String, SystemConsumer> createConsumers(Set<String>
storeSystems,
Map<String, SystemFactory> systemFactories, Config config,
MetricsRegistry registry) {
- // Determine the set of systems being used across all stores
- Set<String> storeSystems =
-
systemStreams.values().stream().flatMap(Set::stream).map(SystemStream::getSystem).collect(Collectors.toSet());
-
// Create one consumer for each system in use, map with one entry for each
such system
- Map<String, SystemConsumer> storeSystemConsumers = new HashMap<>();
-
+ Map<String, SystemConsumer> consumers = new HashMap<>();
// Iterate over the list of storeSystems and create one sysConsumer per
system
for (String storeSystemName : storeSystems) {
SystemFactory systemFactory = systemFactories.get(storeSystemName);
if (systemFactory == null) {
- throw new SamzaException("Changelog system " + storeSystemName + "
does not exist in config");
+ throw new SamzaException("System " + storeSystemName + " does not
exist in config");
}
- storeSystemConsumers.put(storeSystemName,
- systemFactory.getConsumer(storeSystemName, config, registry));
+ consumers.put(storeSystemName,
systemFactory.getConsumer(storeSystemName, config, registry));
}
- return storeSystemConsumers;
-
+ return consumers;
}
private static Map<String, SystemConsumer> createStoreIndexedMap(Map<String,
SystemStream> changelogSystemStreams,
@@ -406,7 +415,7 @@ public class ContainerStorageManager {
for (String storeName : storageEngineFactories.keySet()) {
- StorageEngineFactory.StoreMode storeMode =
this.sideInputSystemStreams.containsKey(storeName) ?
+ StorageEngineFactory.StoreMode storeMode =
this.taskSideInputStoreSSPs.get(taskName).containsKey(storeName) ?
StorageEngineFactory.StoreMode.ReadWrite :
StorageEngineFactory.StoreMode.BulkLoad;
StorageEngine storageEngine =
@@ -476,7 +485,7 @@ public class ContainerStorageManager {
// Use the logged-store-base-directory for change logged stores and
sideInput stores, and non-logged-store-base-dir
// for non logged stores
File storeDirectory;
- if (changeLogSystemStreamPartition != null ||
sideInputSystemStreams.containsKey(storeName)) {
+ if (changeLogSystemStreamPartition != null ||
this.taskSideInputStoreSSPs.get(taskName).containsKey(storeName)) {
storeDirectory =
storageManagerUtil.getTaskStoreDir(this.loggedStoreBaseDirectory, storeName,
taskName,
taskModel.getTaskMode());
} else {
@@ -522,15 +531,14 @@ public class ContainerStorageManager {
// Create sideInput store processors, one per store per task
private Map<TaskName, Map<String, SideInputsProcessor>>
createSideInputProcessors(StorageConfig config,
- ContainerModel containerModel, Map<String, Set<SystemStream>>
sideInputSystemStreams,
- Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics) {
+ ContainerModel containerModel, Map<TaskName, TaskInstanceMetrics>
taskInstanceMetrics) {
Map<TaskName, Map<String, SideInputsProcessor>>
sideInputStoresToProcessors = new HashMap<>();
containerModel.getTasks().forEach((taskName, taskModel) -> {
sideInputStoresToProcessors.put(taskName, new HashMap<>());
TaskMode taskMode = taskModel.getTaskMode();
- for (String storeName : sideInputSystemStreams.keySet()) {
+ for (String storeName :
this.taskSideInputStoreSSPs.get(taskName).keySet()) {
SideInputsProcessor sideInputsProcessor;
Optional<String> sideInputsProcessorSerializedInstance =
@@ -583,53 +591,57 @@ public class ContainerStorageManager {
}
// Create task sideInput storage managers, one per task, index by the SSP
they are responsible for consuming
- private Map<SystemStreamPartition, TaskSideInputStorageManager>
createSideInputStorageManagers(Clock clock) {
+ private Map<SystemStreamPartition, TaskSideInputHandler>
createSideInputHandlers(Clock clock) {
// creating sideInput store processors, one per store per task
Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors =
- createSideInputProcessors(new StorageConfig(config),
this.containerModel, this.sideInputSystemStreams,
- this.taskInstanceMetrics);
+ createSideInputProcessors(new StorageConfig(config),
this.containerModel, this.taskInstanceMetrics);
- Map<SystemStreamPartition, TaskSideInputStorageManager>
sideInputStorageManagers = new HashMap<>();
+ Map<SystemStreamPartition, TaskSideInputHandler> handlers = new
HashMap<>();
- if (sideInputsPresent()) {
+ if (this.hasSideInputs) {
containerModel.getTasks().forEach((taskName, taskModel) -> {
Map<String, StorageEngine> sideInputStores =
getSideInputStores(taskName);
Map<String, Set<SystemStreamPartition>> sideInputStoresToSSPs = new
HashMap<>();
for (String storeName : sideInputStores.keySet()) {
- Set<SystemStreamPartition> storeSSPs =
taskSideInputSSPs.get(taskName).get(storeName);
+ Set<SystemStreamPartition> storeSSPs =
this.taskSideInputStoreSSPs.get(taskName).get(storeName);
sideInputStoresToSSPs.put(storeName, storeSSPs);
}
- TaskSideInputStorageManager taskSideInputStorageManager =
- new TaskSideInputStorageManager(taskName,
taskModel.getTaskMode(), streamMetadataCache,
- loggedStoreBaseDirectory, sideInputStores,
taskSideInputProcessors.get(taskName), sideInputStoresToSSPs,
- systemAdmins, config, clock);
+ TaskSideInputHandler taskSideInputHandler = new
TaskSideInputHandler(taskName,
+ taskModel.getTaskMode(),
+ loggedStoreBaseDirectory,
+ sideInputStores,
+ sideInputStoresToSSPs,
+ taskSideInputProcessors.get(taskName),
+ this.systemAdmins,
+ this.streamMetadataCache,
+ clock);
sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> {
- sideInputStorageManagers.put(ssp, taskSideInputStorageManager);
+ handlers.put(ssp, taskSideInputHandler);
});
- LOG.info("Created taskSideInputStorageManager for task {},
sideInputStores {} and loggedStoreBaseDirectory {}",
+ LOG.info("Created TaskSideInputHandler for task {}, sideInputStores
{} and loggedStoreBaseDirectory {}",
taskName, sideInputStores, loggedStoreBaseDirectory);
});
}
- return sideInputStorageManagers;
+ return handlers;
}
private Map<String, StorageEngine> getSideInputStores(TaskName taskName) {
return taskStores.get(taskName).entrySet().stream().
- filter(e ->
sideInputSystemStreams.containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ filter(e ->
this.taskSideInputStoreSSPs.get(taskName).containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
}
private Map<String, StorageEngine> getNonSideInputStores(TaskName taskName) {
return taskStores.get(taskName).entrySet().stream().
- filter(e ->
!sideInputSystemStreams.containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ filter(e ->
!this.taskSideInputStoreSSPs.get(taskName).containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
}
- private Set<TaskSideInputStorageManager> getSideInputStorageManagers() {
- return
this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
+ private Set<TaskSideInputHandler> getSideInputHandlers() {
+ return
this.sspSideInputHandlers.values().stream().collect(Collectors.toSet());
}
public void start() throws SamzaException, InterruptedException {
@@ -651,7 +663,7 @@ public class ContainerStorageManager {
}
LOG.info("Checkpointed changelog ssp offsets: {}",
checkpointedChangelogSSPOffsets);
restoreStores(checkpointedChangelogSSPOffsets);
- if (sideInputsPresent()) {
+ if (this.hasSideInputs) {
startSideInputs();
}
}
@@ -714,7 +726,7 @@ public class ContainerStorageManager {
LOG.info("SideInput Restore started");
// initialize the sideInputStorageManagers
- getSideInputStorageManagers().forEach(sideInputStorageManager ->
sideInputStorageManager.init());
+ getSideInputHandlers().forEach(TaskSideInputHandler::init);
// start the checkpointing thread at the commit-ms frequency
TaskConfig taskConfig = new TaskConfig(config);
@@ -722,7 +734,7 @@ public class ContainerStorageManager {
@Override
public void run() {
try {
- getSideInputStorageManagers().forEach(sideInputStorageManager ->
sideInputStorageManager.flush());
+ getSideInputHandlers().forEach(TaskSideInputHandler::flush);
} catch (Exception e) {
LOG.error("Exception during flushing sideInputs", e);
sideInputException = e;
@@ -731,11 +743,11 @@ public class ContainerStorageManager {
}, 0, taskConfig.getCommitMs(), TimeUnit.MILLISECONDS);
// set the latch to the number of sideInput SSPs
- this.sideInputsCaughtUp = new
CountDownLatch(this.sideInputStorageManagers.keySet().size());
+ this.sideInputsCaughtUp = new
CountDownLatch(this.sspSideInputHandlers.keySet().size());
// register all sideInput SSPs with the consumers
- for (SystemStreamPartition ssp : sideInputStorageManagers.keySet()) {
- String startingOffset =
sideInputStorageManagers.get(ssp).getStartingOffset(ssp);
+ for (SystemStreamPartition ssp : this.sspSideInputHandlers.keySet()) {
+ String startingOffset =
this.sspSideInputHandlers.get(ssp).getStartingOffset(ssp);
if (startingOffset == null) {
throw new SamzaException(
@@ -744,8 +756,8 @@ public class ContainerStorageManager {
// register startingOffset with the sysConsumer and register a metric
for it
sideInputSystemConsumers.register(ssp, startingOffset);
-
taskInstanceMetrics.get(sideInputStorageManagers.get(ssp).getTaskName()).addOffsetGauge(
- ssp, ScalaJavaUtil.toScalaFunction(() ->
sideInputStorageManagers.get(ssp).getLastProcessedOffset(ssp)));
+
taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+ ssp, ScalaJavaUtil.toScalaFunction(() ->
this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
SystemStreamMetadata systemStreamMetadata =
streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
@@ -772,7 +784,7 @@ public class ContainerStorageManager {
if (envelope != null) {
if (!envelope.isEndOfStream()) {
-
sideInputStorageManagers.get(envelope.getSystemStreamPartition()).process(envelope);
+
this.sspSideInputHandlers.get(envelope.getSystemStreamPartition()).process(envelope);
}
checkSideInputCaughtUp(envelope.getSystemStreamPartition(),
envelope.getOffset(),
@@ -812,10 +824,6 @@ public class ContainerStorageManager {
LOG.info("SideInput Restore complete");
}
- private boolean sideInputsPresent() {
- return !this.sideInputSystemStreams.isEmpty();
- }
-
// Method to check if the given offset means the stream is caught up for
reads
private void checkSideInputCaughtUp(SystemStreamPartition ssp, String
offset, SystemStreamMetadata.OffsetType offsetType, boolean isEndOfStream) {
@@ -892,7 +900,7 @@ public class ContainerStorageManager {
this.shouldShutdown = true;
// stop all sideinput consumers and stores
- if (sideInputsPresent()) {
+ if (this.hasSideInputs) {
sideInputsReadExecutor.shutdownNow();
this.sideInputSystemConsumers.stop();
@@ -907,7 +915,7 @@ public class ContainerStorageManager {
}
// stop all sideInputStores -- this will perform one last flush on the
KV stores, and write the offset file
- this.getSideInputStorageManagers().forEach(sideInputStorageManager ->
sideInputStorageManager.stop());
+ this.getSideInputHandlers().forEach(TaskSideInputHandler::stop);
}
LOG.info("Shutdown complete");
}
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
new file mode 100644
index 0000000..75562db
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class TestTaskSideInputHandler {
+ private static final String TEST_SYSTEM = "test-system";
+ private static final String TEST_STORE = "test-store";
+ private static final String TEST_STREAM = "test-stream";
+
+ /**
+ * This test is for cases, when calls to systemAdmin (e.g.,
KafkaSystemAdmin's) get-stream-metadata method return null.
+ */
+ @Test
+ public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
+ final String taskName = "test-get-starting-offset-task";
+
+ Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
+ .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM,
new Partition(idx)))
+ .collect(Collectors.toSet());
+ Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>
partitionMetadata = ssps.stream()
+ .collect(Collectors.toMap(SystemStreamPartition::getPartition,
+ x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null,
"1", "2")));
+
+
+ TaskSideInputHandler handler = new
MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+ .addStreamMetadata(Collections.singletonMap(new
SystemStream(TEST_SYSTEM, TEST_STREAM),
+ new SystemStreamMetadata(TEST_STREAM, partitionMetadata)))
+ .addStore(TEST_STORE, ssps)
+ .build();
+
+ handler.init();
+
+ ssps.forEach(ssp -> {
+ String startingOffset = handler.getStartingOffset(
+ new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM,
ssp.getPartition()));
+ Assert.assertNull("Starting offset should be null", startingOffset);
+ });
+ }
+
+ @Test
+ public void testGetStartingOffsets() {
+ final String storeName = "test-get-starting-offset-store";
+ final String taskName = "test-get-starting-offset-task";
+
+ Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
+ .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM,
new Partition(idx)))
+ .collect(Collectors.toSet());
+
+
+ TaskSideInputHandler handler = new
MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+ .addStore(storeName, ssps)
+ .build();
+
+ // set up file and oldest offsets. for even partitions, fileOffsets will
be larger; for odd partitions oldestOffsets will be larger
+ Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
+ .collect(Collectors.toMap(Function.identity(), ssp -> {
+ int partitionId = ssp.getPartition().getPartitionId();
+ int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
+ return String.valueOf(offset);
+ }));
+ Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
+ .collect(Collectors.toMap(Function.identity(), ssp -> {
+ int partitionId = ssp.getPartition().getPartitionId();
+ int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
+
+ return String.valueOf(offset);
+ }));
+
+ doCallRealMethod().when(handler).getStartingOffsets(fileOffsets,
oldestOffsets);
+
+ Map<SystemStreamPartition, String> startingOffsets =
handler.getStartingOffsets(fileOffsets, oldestOffsets);
+
+ assertTrue("Failed to get starting offsets for all ssps",
startingOffsets.size() == 5);
+ startingOffsets.forEach((ssp, offset) -> {
+ int partitionId = ssp.getPartition().getPartitionId();
+ String expectedOffset = partitionId % 2 == 0
+ // 1 + fileOffset
+ ?
getOffsetAfter(String.valueOf(ssp.getPartition().getPartitionId() + 10))
+ // oldestOffset
+ : String.valueOf(ssp.getPartition().getPartitionId() + 10);
+ assertEquals("Larger of fileOffsets and oldestOffsets should always be
chosen", expectedOffset, offset);
+ });
+ }
+
+ private static final class MockTaskSideInputHandlerBuilder {
+ final TaskName taskName;
+ final TaskMode taskMode;
+ final File storeBaseDir;
+
+ final Map<String, StorageEngine> stores = new HashMap<>();
+ final Map<String, Set<SystemStreamPartition>> storeToSSPs = new
HashMap<>();
+ final Clock clock = mock(Clock.class);
+ final Map<String, SideInputsProcessor> storeToProcessor = new HashMap<>();
+ final StreamMetadataCache streamMetadataCache =
mock(StreamMetadataCache.class);
+ final SystemAdmins systemAdmins = mock(SystemAdmins.class);
+
+ public MockTaskSideInputHandlerBuilder(String taskName, TaskMode taskMode)
{
+ this.taskName = new TaskName(taskName);
+ this.taskMode = taskMode;
+ this.storeBaseDir = mock(File.class);
+
+ initializeMocks();
+ }
+
+ private void initializeMocks() {
+ SystemAdmin admin = mock(SystemAdmin.class);
+ doAnswer(invocation -> {
+ String offset1 = invocation.getArgumentAt(0, String.class);
+ String offset2 = invocation.getArgumentAt(1, String.class);
+
+ return Long.compare(Long.parseLong(offset1),
Long.parseLong(offset2));
+ }).when(admin).offsetComparator(any(), any());
+ doAnswer(invocation -> {
+ Map<SystemStreamPartition, String> sspToOffsets =
invocation.getArgumentAt(0, Map.class);
+
+ return sspToOffsets.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ entry -> getOffsetAfter(entry.getValue())));
+ }).when(admin).getOffsetsAfter(any());
+ doReturn(admin).when(systemAdmins).getSystemAdmin(TEST_SYSTEM);
+ doReturn(ScalaJavaUtil.toScalaMap(new
HashMap<>())).when(streamMetadataCache).getStreamMetadata(any(), anyBoolean());
+ }
+
+
+ MockTaskSideInputHandlerBuilder addStreamMetadata(Map<SystemStream,
SystemStreamMetadata> streamMetadata) {
+
doReturn(ScalaJavaUtil.toScalaMap(streamMetadata)).when(streamMetadataCache).getStreamMetadata(any(),
anyBoolean());
+ return this;
+ }
+
+ MockTaskSideInputHandlerBuilder addStore(String storeName,
Set<SystemStreamPartition> storeSSPs) {
+ storeToSSPs.put(storeName, storeSSPs);
+ storeToProcessor.put(storeName, mock(SideInputsProcessor.class));
+ return this;
+ }
+
+ TaskSideInputHandler build() {
+ return spy(new TaskSideInputHandler(taskName,
+ taskMode,
+ storeBaseDir,
+ stores,
+ storeToSSPs,
+ storeToProcessor,
+ systemAdmins,
+ streamMetadataCache,
+ clock));
+ }
+ }
+
+ private static String getOffsetAfter(String offset) {
+ return String.valueOf(Long.parseLong(offset) + 1);
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
index 6761702..9c41e85 100644
---
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
+++
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
@@ -19,6 +19,7 @@
package org.apache.samza.storage;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.util.Collections;
@@ -29,18 +30,10 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
-import org.apache.samza.util.ScalaJavaUtil;
-import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -72,6 +65,7 @@ public class TestTaskSideInputStorageManager {
final String taskName = "test-flush-task";
final SystemStreamPartition ssp = new SystemStreamPartition("test-system",
"test-stream", new Partition(0));
final String offset = "123";
+ final ImmutableMap<SystemStreamPartition, String> processedOffsets =
ImmutableMap.of(ssp, offset);
TaskSideInputStorageManager testSideInputStorageManager = new
MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
.addLoggedStore(storeName, ImmutableSet.of(ssp))
@@ -79,14 +73,13 @@ public class TestTaskSideInputStorageManager {
Map<String, StorageEngine> stores = new HashMap<>();
initializeSideInputStorageManager(testSideInputStorageManager);
- testSideInputStorageManager.updateLastProcessedOffset(ssp, offset);
- testSideInputStorageManager.flush();
+ testSideInputStorageManager.flush(processedOffsets);
for (StorageEngine storageEngine : stores.values()) {
verify(storageEngine).flush();
}
- verify(testSideInputStorageManager).writeOffsetFiles();
+ verify(testSideInputStorageManager).writeFileOffsets(eq(processedOffsets));
File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
assertTrue("Store directory: " + storeDir.getPath() + " is missing.",
storeDir.exists());
@@ -100,16 +93,19 @@ public class TestTaskSideInputStorageManager {
public void testStop() {
final String storeName = "test-stop-store";
final String taskName = "test-stop-task";
+ final SystemStreamPartition ssp = new SystemStreamPartition("test-system",
"test-stream", new Partition(0));
+ final String offset = "123";
+ final ImmutableMap<SystemStreamPartition, String> processedOffsets =
ImmutableMap.of(ssp, offset);
TaskSideInputStorageManager testSideInputStorageManager = new
MockTaskSideInputStorageManagerBuilder(taskName, NON_LOGGED_STORE_DIR)
.addInMemoryStore(storeName, ImmutableSet.of())
.build();
initializeSideInputStorageManager(testSideInputStorageManager);
- testSideInputStorageManager.stop();
+ testSideInputStorageManager.stop(processedOffsets);
verify(testSideInputStorageManager.getStore(storeName)).stop();
- verify(testSideInputStorageManager).writeOffsetFiles();
+ verify(testSideInputStorageManager).writeFileOffsets(eq(processedOffsets));
}
@Test
@@ -122,7 +118,7 @@ public class TestTaskSideInputStorageManager {
.build();
initializeSideInputStorageManager(testSideInputStorageManager);
- testSideInputStorageManager.writeOffsetFiles(); // should be no-op
+ testSideInputStorageManager.writeFileOffsets(Collections.emptyMap()); //
should be no-op
File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
assertFalse("Store directory: " + storeDir.getPath() + " should not be
created for non-persisted store", storeDir.exists());
@@ -138,15 +134,15 @@ public class TestTaskSideInputStorageManager {
final SystemStreamPartition ssp = new SystemStreamPartition("test-system",
"test-stream", new Partition(0));
final SystemStreamPartition ssp2 = new
SystemStreamPartition("test-system2", "test-stream2", new Partition(0));
+ Map<SystemStreamPartition, String> processedOffsets = ImmutableMap.of(ssp,
offset, ssp2, offset);
+
TaskSideInputStorageManager testSideInputStorageManager = new
MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
.addLoggedStore(storeName, ImmutableSet.of(ssp))
.addLoggedStore(storeName2, ImmutableSet.of(ssp2))
.build();
initializeSideInputStorageManager(testSideInputStorageManager);
- testSideInputStorageManager.updateLastProcessedOffset(ssp, offset);
- testSideInputStorageManager.updateLastProcessedOffset(ssp2, offset);
- testSideInputStorageManager.writeOffsetFiles();
+ testSideInputStorageManager.writeFileOffsets(processedOffsets);
File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
assertTrue("Store directory: " + storeDir.getPath() + " is missing.",
storeDir.exists());
@@ -175,87 +171,19 @@ public class TestTaskSideInputStorageManager {
.build();
initializeSideInputStorageManager(testSideInputStorageManager);
- ssps.forEach(ssp ->
testSideInputStorageManager.updateLastProcessedOffset(ssp, offset));
- testSideInputStorageManager.writeOffsetFiles();
+ Map<SystemStreamPartition, String> processedOffsets = ssps.stream()
+ .collect(Collectors.toMap(Function.identity(), ssp -> offset));
- Map<SystemStreamPartition, String> fileOffsets =
testSideInputStorageManager.getFileOffsets();
+ testSideInputStorageManager.writeFileOffsets(processedOffsets);
+ Map<SystemStreamPartition, String> fileOffsets =
testSideInputStorageManager.getFileOffsets();
ssps.forEach(ssp -> {
assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from
file.", fileOffsets.containsKey(ssp));
assertEquals("Mismatch between last processed offset and file
offset.", fileOffsets.get(ssp), offset);
});
}
- /**
- * This test is for cases, when calls to systemAdmin (e.g.,
KafkaSystemAdmin's) get-stream-metadata method return null.
- */
- @Test
- public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
- final String storeName = "test-get-starting-offset-store";
- final String taskName = "test-get-starting-offset-task";
-
- Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
- .mapToObj(idx -> new SystemStreamPartition("test-system",
"test-stream", new Partition(idx)))
- .collect(Collectors.toSet());
- Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>
partitionMetadata = ssps.stream()
- .collect(Collectors.toMap(SystemStreamPartition::getPartition,
- x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null,
"1", "2")));
-
-
- TaskSideInputStorageManager testSideInputStorageManager = new
MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
- .addLoggedStore(storeName, ssps)
- .addStreamMetadata(Collections.singletonMap(new
SystemStream("test-system", "test-stream"),
- new SystemStreamMetadata("test-stream", partitionMetadata)))
- .build();
-
- initializeSideInputStorageManager(testSideInputStorageManager);
- ssps.forEach(ssp -> {
- String startingOffset = testSideInputStorageManager.getStartingOffset(
- new SystemStreamPartition("test-system", "test-stream",
ssp.getPartition()));
- Assert.assertNull("Starting offset should be null", startingOffset);
- });
- }
-
- @Test
- public void testGetStartingOffsets() {
- final String storeName = "test-get-starting-offset-store";
- final String taskName = "test-get-starting-offset-task";
-
- Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
- .mapToObj(idx -> new SystemStreamPartition("test-system",
"test-stream", new Partition(idx)))
- .collect(Collectors.toSet());
-
-
- TaskSideInputStorageManager testSideInputStorageManager = new
MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
- .addLoggedStore(storeName, ssps)
- .build();
-
- initializeSideInputStorageManager(testSideInputStorageManager);
- Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
- .collect(Collectors.toMap(Function.identity(), ssp -> {
- int partitionId = ssp.getPartition().getPartitionId();
- int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
- return String.valueOf(offset);
- }));
-
- Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
- .collect(Collectors.toMap(Function.identity(), ssp -> {
- int partitionId = ssp.getPartition().getPartitionId();
- int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
-
- return String.valueOf(offset);
- }));
-
-
doCallRealMethod().when(testSideInputStorageManager).getStartingOffsets(fileOffsets,
oldestOffsets);
-
- Map<SystemStreamPartition, String> startingOffsets =
- testSideInputStorageManager.getStartingOffsets(fileOffsets,
oldestOffsets);
-
- assertTrue("Failed to get starting offsets for all ssps",
startingOffsets.size() == 5);
- }
-
private void initializeSideInputStorageManager(TaskSideInputStorageManager
testSideInputStorageManager) {
- doReturn(new
HashMap<>()).when(testSideInputStorageManager).getStartingOffsets(any(), any());
testSideInputStorageManager.init();
}
@@ -263,39 +191,13 @@ public class TestTaskSideInputStorageManager {
private final TaskName taskName;
private final String storeBaseDir;
- private Clock clock = mock(Clock.class);
- private Map<String, SideInputsProcessor> storeToProcessor = new
HashMap<>();
private Map<String, StorageEngine> stores = new HashMap<>();
private Map<String, Set<SystemStreamPartition>> storeToSSps = new
HashMap<>();
- private StreamMetadataCache streamMetadataCache =
mock(StreamMetadataCache.class);
- private SystemAdmins systemAdmins = mock(SystemAdmins.class);
+ private Clock clock = mock(Clock.class);
public MockTaskSideInputStorageManagerBuilder(String taskName, String
storeBaseDir) {
this.taskName = new TaskName(taskName);
this.storeBaseDir = storeBaseDir;
-
- initializeMocks();
- }
-
- private void initializeMocks() {
- SystemAdmin admin = mock(SystemAdmin.class);
- doAnswer(invocation -> {
- String offset1 = invocation.getArgumentAt(0, String.class);
- String offset2 = invocation.getArgumentAt(1, String.class);
-
- return Long.compare(Long.parseLong(offset1),
Long.parseLong(offset2));
- }).when(admin).offsetComparator(any(), any());
- doAnswer(invocation -> {
- Map<SystemStreamPartition, String> sspToOffsets =
invocation.getArgumentAt(0, Map.class);
-
- return sspToOffsets.entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey,
- entry -> String.valueOf(Long.parseLong(entry.getValue()) +
1)));
- }).when(admin).getOffsetsAfter(any());
- doReturn(admin).when(systemAdmins).getSystemAdmin("test-system");
-
- doReturn(ScalaJavaUtil.toScalaMap(new
HashMap<>())).when(streamMetadataCache).getStreamMetadata(any(), anyBoolean());
}
MockTaskSideInputStorageManagerBuilder addInMemoryStore(String storeName,
Set<SystemStreamPartition> ssps) {
@@ -304,32 +206,25 @@ public class TestTaskSideInputStorageManager {
new
StoreProperties.StorePropertiesBuilder().setLoggedStore(false).setPersistedToDisk(false).build());
stores.put(storeName, storageEngine);
- storeToProcessor.put(storeName, mock(SideInputsProcessor.class));
storeToSSps.put(storeName, ssps);
return this;
}
- MockTaskSideInputStorageManagerBuilder addStreamMetadata(Map<SystemStream,
SystemStreamMetadata> streamMetadata) {
-
doReturn(ScalaJavaUtil.toScalaMap(streamMetadata)).when(streamMetadataCache).getStreamMetadata(any(),
anyBoolean());
- return this;
- }
-
MockTaskSideInputStorageManagerBuilder addLoggedStore(String storeName,
Set<SystemStreamPartition> ssps) {
StorageEngine storageEngine = mock(StorageEngine.class);
when(storageEngine.getStoreProperties()).thenReturn(
new
StoreProperties.StorePropertiesBuilder().setLoggedStore(false).setPersistedToDisk(true).build());
stores.put(storeName, storageEngine);
- storeToProcessor.put(storeName, mock(SideInputsProcessor.class));
storeToSSps.put(storeName, ssps);
return this;
}
TaskSideInputStorageManager build() {
- return spy(new TaskSideInputStorageManager(taskName, TaskMode.Active,
streamMetadataCache, new File(storeBaseDir), stores,
- storeToProcessor, storeToSSps, systemAdmins, mock(Config.class),
clock));
+ return spy(new TaskSideInputStorageManager(taskName, TaskMode.Active,
new File(storeBaseDir), stores, storeToSSps,
+ clock));
}
}
}
\ No newline at end of file