Repository: samza Updated Branches: refs/heads/master 9f8d593c4 -> fa56b15dc
SAMZA-1773: Side inputs for local stores prateekm vjagadish Please take a look. I will update the PR with the unit tests for SideInputStorageManager and the util functions. Author: Bharath Kumarasubramanian <[email protected]> Author: Prateek Maheshwari <[email protected]> Author: Prateek Maheshwari <[email protected]> Reviewers: Cameron Lee <[email protected]>, Jagadish Venkatraman <[email protected]>, Shanthoosh Venkatraman <[email protected]> Closes #570 from bharathkk/side-input-v3 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fa56b15d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fa56b15d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fa56b15d Branch: refs/heads/master Commit: fa56b15dc55f006cda57eec7d8dfc999db8b94fb Parents: 9f8d593 Author: Bharath Kumarasubramanian <[email protected]> Authored: Tue Jul 24 09:01:09 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Tue Jul 24 09:01:09 2018 -0700 ---------------------------------------------------------------------- .../samza/storage/SideInputsProcessor.java | 46 +++ .../storage/SideInputsProcessorFactory.java | 45 +++ .../java/org/apache/samza/table/TableSpec.java | 44 ++- .../apache/samza/config/JavaStorageConfig.java | 47 +++ .../apache/samza/container/TaskContextImpl.java | 15 +- .../org/apache/samza/execution/JobNode.java | 20 +- .../samza/storage/StorageManagerUtil.java | 138 +++++++ .../storage/TaskSideInputStorageManager.java | 379 +++++++++++++++++++ .../org/apache/samza/config/StorageConfig.scala | 12 + .../org/apache/samza/config/StreamConfig.scala | 2 +- .../apache/samza/container/SamzaContainer.scala | 66 +++- .../apache/samza/container/TaskInstance.scala | 130 +++++-- .../samza/storage/TaskStorageManager.scala | 96 +---- .../scala/org/apache/samza/util/FileUtil.scala | 8 +- .../org/apache/samza/util/ScalaJavaUtil.scala | 12 + .../main/scala/org/apache/samza/util/Util.scala | 20 + .../org/apache/samza/task/TestAsyncRunLoop.java | 2 +- .../samza/container/TestTaskInstance.scala | 5 +- .../samza/storage/TestTaskStorageManager.scala | 25 +- .../kv/inmemory/InMemoryTableDescriptor.java | 3 +- .../storage/kv/RocksDbTableDescriptor.java | 3 +- .../kv/BaseLocalStoreBackedTableDescriptor.java | 21 + .../kv/BaseLocalStoreBackedTableProvider.java | 12 + 23 files changed, 983 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessor.java b/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessor.java new file mode 100644 index 0000000..d2dcac5 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import java.io.Serializable; +import java.util.Collection; +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; + + +/** + * The processing logic for store side inputs. Accepts incoming messages from side input streams + * and the current store contents, and returns the new key-value entries to be written to the store. + */ +@FunctionalInterface [email protected] +public interface SideInputsProcessor extends Serializable { + + /** + * Process the incoming side input message for the {@code store}. + * + * @param message incoming message envelope + * @param store the store associated with the incoming message envelope + * @return a {@link Collection} of {@link Entry}s that will be written to the {@code store}. + */ + Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store); +} http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessorFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessorFactory.java b/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessorFactory.java new file mode 100644 index 0000000..fb7fc2c --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/storage/SideInputsProcessorFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import java.io.Serializable; +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; + + +/** + * A factory to build {@link SideInputsProcessor}s. + * + * Implementations should return a new instance for every invocation of + * {@link #getSideInputsProcessor(Config, MetricsRegistry)} + */ +@FunctionalInterface [email protected] +public interface SideInputsProcessorFactory extends Serializable { + /** + * Creates a new instance of a {@link SideInputsProcessor}. + * + * @param config the configuration + * @param metricsRegistry the metrics registry + * @return an instance of {@link SideInputsProcessor} + */ + SideInputsProcessor getSideInputsProcessor(Config config, MetricsRegistry metricsRegistry); +} http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-api/src/main/java/org/apache/samza/table/TableSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java index ba57c2f..82577c4 100644 --- a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java +++ b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java @@ -21,10 +21,12 @@ package org.apache.samza.table; import java.io.Serializable; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.serializers.KVSerde; +import org.apache.samza.storage.SideInputsProcessor; /** @@ -52,6 +54,8 @@ public class TableSpec implements Serializable { * once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis */ private transient final KVSerde serde; + private transient final List<String> sideInputs; + private transient final SideInputsProcessor sideInputsProcessor; private transient final Map<String, String> config = new HashMap<>(); /** @@ -61,6 +65,8 @@ public class TableSpec implements Serializable { this.id = null; this.serde = null; this.tableProviderFactoryClassName = null; + this.sideInputs = null; + this.sideInputsProcessor = null; } /** @@ -71,12 +77,28 @@ public class TableSpec implements Serializable { * @param serde the serde * @param config implementation specific configuration */ - public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName, - Map<String, String> config) { + public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName, Map<String, String> config) { + this(tableId, serde, tableProviderFactoryClassName, config, Collections.emptyList(), null); + } + + /** + * Constructs a {@link TableSpec} + * + * @param tableId Id of the table + * @param tableProviderFactoryClassName table provider factory + * @param serde the serde + * @param config implementation specific configuration + * @param sideInputs list of side inputs for the table + * @param sideInputsProcessor side input processor for the table + */ + public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName, Map<String, String> config, + List<String> sideInputs, SideInputsProcessor sideInputsProcessor) { this.id = tableId; this.serde = serde; this.tableProviderFactoryClassName = tableProviderFactoryClassName; this.config.putAll(config); + this.sideInputs = sideInputs; + this.sideInputsProcessor = sideInputsProcessor; } /** @@ -113,6 +135,24 @@ public class TableSpec implements Serializable { return Collections.unmodifiableMap(config); } + /** + * Get the list of side inputs for the table. + * + * @return a {@link List} of side input streams + */ + public List<String> getSideInputs() { + return sideInputs; + } + + /** + * Get the {@link SideInputsProcessor} associated with the table. + * + * @return a {@link SideInputsProcessor} + */ + public SideInputsProcessor getSideInputsProcessor() { + return sideInputsProcessor; + } + @Override public boolean equals(Object o) { if (this == o) { http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java index bbf0ccf..1e3dbff 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java @@ -20,7 +20,11 @@ package org.apache.samza.config; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.execution.StreamManager; @@ -43,6 +47,10 @@ public class JavaStorageConfig extends MapConfig { private static final String ACCESSLOG_ENABLED = "stores.%s.accesslog.enabled"; private static final int DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50; + public static final String SIDE_INPUTS = "stores.%s.side.inputs"; + public static final String SIDE_INPUTS_PROCESSOR_FACTORY = "stores.%s.side.inputs.processor.factory"; + public static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE = "stores.%s.side.inputs.processor.serialized.instance"; + public JavaStorageConfig(Config config) { super(config); } @@ -126,4 +134,43 @@ public class JavaStorageConfig extends MapConfig { public String getChangelogSystem() { return get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM(), null)); } + + /** + * Gets the side inputs for the store. A store can have multiple side input streams which can be + * provided as a comma separated list. + * + * Each side input must either be a {@code streamId}, or of the format {@code systemName.streamName}. + * E.g. {@code stores.storeName.side.inputs = kafka.topicA, mySystem.topicB} + * + * @param storeName name of the store + * @return a list of side input streams for the store, or an empty list if it has none. + */ + public List<String> getSideInputs(String storeName) { + return Optional.ofNullable(get(String.format(SIDE_INPUTS, storeName), null)) + .map(inputs -> Stream.of(inputs.split(",")) + .map(String::trim) + .filter(input -> !input.isEmpty()) + .collect(Collectors.toList())) + .orElse(Collections.emptyList()); + } + + /** + * Gets the SideInputsProcessorFactory associated with the {@code storeName}. + * + * @param storeName name of the store + * @return the class name of SideInputsProcessorFactory if present, null otherwise + */ + public String getSideInputsProcessorFactory(String storeName) { + return get(String.format(SIDE_INPUTS_PROCESSOR_FACTORY, storeName), null); + } + + /** + * Gets the serialized instance of SideInputsProcessor associated with the {@code storeName}. + * + * @param storeName name of the store + * @return the serialized instance of SideInputsProcessor if present, null otherwise + */ + public String getSideInputsProcessorSerializedInstance(String storeName) { + return get(String.format(SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, storeName), null); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java index 0d76a33..d65be4c 100644 --- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java +++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java @@ -20,10 +20,10 @@ package org.apache.samza.container; import com.google.common.collect.ImmutableSet; +import java.util.function.Function; import org.apache.samza.checkpoint.OffsetManager; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.ReadableMetricsRegistry; -import org.apache.samza.storage.TaskStorageManager; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemStreamPartition; @@ -48,7 +48,7 @@ public class TaskContextImpl implements TaskContext { private final SamzaContainerContext containerContext; private final Set<SystemStreamPartition> systemStreamPartitions; private final OffsetManager offsetManager; - private final TaskStorageManager storageManager; + private final Function<String, KeyValueStore> kvStoreSupplier; private final TableManager tableManager; private final JobModel jobModel; private final StreamMetadataCache streamMetadataCache; @@ -62,7 +62,7 @@ public class TaskContextImpl implements TaskContext { SamzaContainerContext containerContext, Set<SystemStreamPartition> systemStreamPartitions, OffsetManager offsetManager, - TaskStorageManager storageManager, + Function<String, KeyValueStore> kvStoreSupplier, TableManager tableManager, JobModel jobModel, StreamMetadataCache streamMetadataCache, @@ -72,7 +72,7 @@ public class TaskContextImpl implements TaskContext { this.containerContext = containerContext; this.systemStreamPartitions = ImmutableSet.copyOf(systemStreamPartitions); this.offsetManager = offsetManager; - this.storageManager = storageManager; + this.kvStoreSupplier = kvStoreSupplier; this.tableManager = tableManager; this.jobModel = jobModel; this.streamMetadataCache = streamMetadataCache; @@ -91,12 +91,11 @@ public class TaskContextImpl implements TaskContext { @Override public KeyValueStore getStore(String storeName) { - if (storageManager != null) { - return (KeyValueStore) storageManager.apply(storeName); - } else { + KeyValueStore store = kvStoreSupplier.apply(storeName); + if (store == null) { LOG.warn("No store found for name: {}", storeName); - return null; } + return store; } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index 6507996..288b1a1 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -140,7 +140,7 @@ public class JobNode { inputs.add(formattedSystemStream); } } - configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs)); + if (!broadcasts.isEmpty()) { // TODO: remove this once we support defining broadcast input stream in high-level // task.broadcast.input should be generated by the planner in the future. @@ -179,6 +179,24 @@ public class JobNode { configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(tables)); + // Add side inputs to the inputs and mark the stream as bootstrap + tables.forEach(tableSpec -> { + List<String> sideInputs = tableSpec.getSideInputs(); + if (sideInputs != null && !sideInputs.isEmpty()) { + sideInputs.stream() + .map(sideInput -> Util.getSystemStreamFromNameOrId(config, sideInput)) + .forEach(systemStream -> { + inputs.add(Util.getNameFromSystemStream(systemStream)); + configs.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(), + systemStream.getSystem(), systemStream.getStream()), "true"); + }); + } + }); + + configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs)); + + log.info("Job {} has generated configs {}", jobName, configs); + String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName); // Disallow user specified job inputs/outputs. This info comes strictly from the user application. http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java new file mode 100644 index 0000000..731a84d --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import com.google.common.collect.ImmutableMap; +import java.io.File; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StorageManagerUtil { + private static final Logger LOG = LoggerFactory.getLogger(StorageManagerUtil.class); + + /** + * Fetch the starting offset for the input {@link SystemStreamPartition} + * + * Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT()} and + * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET()} configurations. It will use the locally + * checkpointed offset if it is valid, or fall back to oldest offset of the stream. + * + * @param ssp system stream partition for which starting offset is requested + * @param admin system admin associated with the ssp + * @param fileOffset local file offset for the ssp + * @param oldestOffset oldest offset for the ssp from the source + * @return starting offset for the incoming {@link SystemStreamPartition} + */ + public static String getStartingOffset( + SystemStreamPartition ssp, SystemAdmin admin, String fileOffset, String oldestOffset) { + String startingOffset = oldestOffset; + if (fileOffset != null) { + // File offset was the last message written to the local checkpoint that is also reflected in the store, + // so we start with the NEXT offset + String resumeOffset = admin.getOffsetsAfter(ImmutableMap.of(ssp, fileOffset)).get(ssp); + if (admin.offsetComparator(oldestOffset, resumeOffset) <= 0) { + startingOffset = resumeOffset; + } else { + // If the offset we plan to use is older than the oldest offset, just use the oldest offset. + // This can happen with source of the store(changelog, etc) configured with a TTL cleanup policy + LOG.warn("Local store offset {} is lower than the oldest offset {} of the source stream." + + " The values between these offsets cannot be restored.", resumeOffset, oldestOffset); + } + } + + return startingOffset; + } + + /** + * Checks if the store is stale. If the time elapsed since the last modified time of the offset file is greater than + * the {@code storeDeleteRetentionInMs}, then the store is considered stale. + * + * @param storeDir the base directory of the store + * @param offsetFileName the offset file name + * @param storeDeleteRetentionInMs store delete retention in millis + * @param currentTimeMs current time in ms + * @return true if the store is stale, false otherwise + */ + public static boolean isStaleStore( + File storeDir, String offsetFileName, long storeDeleteRetentionInMs, long currentTimeMs) { + boolean isStaleStore = false; + String storePath = storeDir.toPath().toString(); + if (storeDir.exists()) { + File offsetFileRef = new File(storeDir, offsetFileName); + long offsetFileLastModifiedTime = offsetFileRef.lastModified(); + if ((currentTimeMs - offsetFileLastModifiedTime) >= storeDeleteRetentionInMs) { + LOG.info( + String.format("Store: %s is stale since lastModifiedTime of offset file: %d, is older than store deleteRetentionMs: %d.", + storePath, offsetFileLastModifiedTime, storeDeleteRetentionInMs)); + isStaleStore = true; + } + } else { + LOG.info("Storage partition directory: {} does not exist.", storePath); + } + return isStaleStore; + } + + /** + * An offset file associated with logged store {@code storeDir} is valid if it exists and is not empty. + * + * @param storeDir the base directory of the store + * @param offsetFileName name of the offset file + * @return true if the offset file is valid. false otherwise. + */ + public static boolean isOffsetFileValid(File storeDir, String offsetFileName) { + boolean hasValidOffsetFile = false; + if (storeDir.exists()) { + String offsetContents = readOffsetFile(storeDir, offsetFileName); + if (offsetContents != null && !offsetContents.isEmpty()) { + hasValidOffsetFile = true; + } else { + LOG.info("Offset file is not valid for store: {}.", storeDir.toPath()); + } + } + + return hasValidOffsetFile; + } + + /** + * Read and return the contents of the offset file. + * + * @param storagePartitionDir the base directory of the store + * @param offsetFileName name of the offset file + * @return the content of the offset file if it exists for the store, null otherwise. + */ + public static String readOffsetFile(File storagePartitionDir, String offsetFileName) { + String offset = null; + File offsetFileRef = new File(storagePartitionDir, offsetFileName); + String storePath = storagePartitionDir.getPath(); + + if (offsetFileRef.exists()) { + LOG.info("Found offset file in storage partition directory: {}", storePath); + offset = FileUtil.readWithChecksum(offsetFileRef); + } else { + LOG.info("No offset file found in storage partition directory: {}", storePath); + } + + return offset; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java new file mode 100644 index 0000000..7a0a822 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import com.google.common.collect.ImmutableList; + +import java.io.File; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JavaStorageConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.apache.samza.util.FileUtil; + +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + + +/** + * A storage manager for all side input stores. It is associated with each {@link org.apache.samza.container.TaskInstance} + * and is responsible for handling directory management, offset tracking and offset file management for the side input stores. + */ +public class TaskSideInputStorageManager { + private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputStorageManager.class); + private static final String OFFSET_FILE = "SIDE-INPUT-OFFSETS"; + private static final long STORE_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1); // same as changelog delete retention + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final Clock clock; + private final Map<String, SideInputsProcessor> storeToProcessor; + private final Map<String, StorageEngine> stores; + private final String storeBaseDir; + private final Map<String, Set<SystemStreamPartition>> storeToSSps; + private final Map<SystemStreamPartition, Set<String>> sspsToStores; + private final StreamMetadataCache streamMetadataCache; + private final SystemAdmins systemAdmins; + private final TaskName taskName; + private final JavaStorageConfig storageConfig; + private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>(); + + private Map<SystemStreamPartition, String> startingOffsets; + + public TaskSideInputStorageManager( + TaskName taskName, + StreamMetadataCache streamMetadataCache, + String storeBaseDir, + Map<String, StorageEngine> sideInputStores, + Map<String, SideInputsProcessor> storesToProcessor, + Map<String, Set<SystemStreamPartition>> storesToSSPs, + SystemAdmins systemAdmins, + Config config, + Clock clock) { + this.clock = clock; + this.storageConfig = new JavaStorageConfig(config); + this.stores = sideInputStores; + this.storeBaseDir = storeBaseDir; + this.storeToSSps = storesToSSPs; + this.streamMetadataCache = streamMetadataCache; + this.systemAdmins = systemAdmins; + this.taskName = taskName; + this.storeToProcessor = storesToProcessor; + + validateStoreConfiguration(); + + this.sspsToStores = new HashMap<>(); + storesToSSPs.forEach((store, ssps) -> { + for (SystemStreamPartition ssp: ssps) { + sspsToStores.computeIfAbsent(ssp, key -> new HashSet<>()); + sspsToStores.computeIfPresent(ssp, (key, value) -> { + value.add(store); + return value; + }); + } + }); + } + + /** + * Initializes the side input storage manager. + */ + public void init() { + LOG.info("Initializing side input stores."); + + Map<SystemStreamPartition, String> fileOffsets = getFileOffsets(); + LOG.info("File offsets for the task {}: ", taskName, fileOffsets); + + Map<SystemStreamPartition, String> oldestOffsets = getOldestOffsets(); + LOG.info("Oldest offsets for the task {}: ", taskName, fileOffsets); + + startingOffsets = getStartingOffsets(fileOffsets, oldestOffsets); + LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets); + + lastProcessedOffsets.putAll(fileOffsets); + LOG.info("Last processed offsets for the task {}: {}", taskName, lastProcessedOffsets); + + initializeStoreDirectories(); + } + + /** + * Flushes the contents of the underlying store and writes the offset file to disk. + */ + public void flush() { + LOG.info("Flushing the side input stores."); + stores.values().forEach(StorageEngine::flush); + writeOffsetFiles(); + } + + /** + * Stops the storage engines for all the stores and writes the offset file to disk. + */ + public void stop() { + LOG.info("Stopping the side input stores."); + stores.values().forEach(StorageEngine::stop); + writeOffsetFiles(); + } + + /** + * Gets the {@link StorageEngine} associated with the input {@code storeName} if found, or null. + * + * @param storeName store name to get the {@link StorageEngine} for + * @return the {@link StorageEngine} associated with {@code storeName} if found, or null + */ + public StorageEngine getStore(String storeName) { + return stores.get(storeName); + } + + /** + * Gets the starting offset for the given side input {@link SystemStreamPartition}. + * + * Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT()} and + * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET()} configurations. It will use the local offset + * file if it is valid, else it will fall back to oldest offset in the stream. + * + * @param ssp side input system stream partition to get the starting offset for + * @return the starting offset + */ + public String getStartingOffset(SystemStreamPartition ssp) { + return startingOffsets.get(ssp); + } + + /** + * Gets the last processed offset for the given side input {@link SystemStreamPartition}. + * + * @param ssp side input system stream partition to get the last processed offset for + * @return the last processed offset + */ + public String getLastProcessedOffset(SystemStreamPartition ssp) { + return lastProcessedOffsets.get(ssp); + } + + /** + * Processes the incoming side input message envelope and updates the last processed offset for its SSP. + * + * @param message incoming message to be processed + */ + public void process(IncomingMessageEnvelope message) { + SystemStreamPartition ssp = message.getSystemStreamPartition(); + Set<String> storeNames = sspsToStores.get(ssp); + + for (String storeName : storeNames) { + SideInputsProcessor sideInputsProcessor = storeToProcessor.get(storeName); + + KeyValueStore keyValueStore = (KeyValueStore) stores.get(storeName); + Collection<Entry<?, ?>> entriesToBeWritten = sideInputsProcessor.process(message, keyValueStore); + keyValueStore.putAll(ImmutableList.copyOf(entriesToBeWritten)); + } + + // update the last processed offset + lastProcessedOffsets.put(ssp, message.getOffset()); + } + + /** + * Initializes the store directories for all the stores: + * 1. Cleans up the directories for invalid stores. + * 2. Ensures that the directories exist. + */ + private void initializeStoreDirectories() { + LOG.info("Initializing side input store directories."); + + stores.keySet().forEach(storeName -> { + File storeLocation = getStoreLocation(storeName); + String storePath = storeLocation.toPath().toString(); + if (!isValidSideInputStore(storeName, storeLocation)) { + LOG.info("Cleaning up the store directory at {} for {}", storePath, storeName); + FileUtil.rm(storeLocation); + } + + if (!storeLocation.exists()) { + LOG.info("Creating {} as the store directory for the side input store {}", storePath, storeName); + storeLocation.mkdirs(); + } + }); + } + + /** + * Writes the offset files for all side input stores one by one. There is one offset file per store. + * Its contents are a JSON encoded mapping from each side input SSP to its last processed offset, and a checksum. + */ + private void writeOffsetFiles() { + storeToSSps.entrySet().stream() + .filter(entry -> isPersistedStore(entry.getKey())) // filter out in-memory side input stores + .forEach((entry) -> { + String storeName = entry.getKey(); + Map<SystemStreamPartition, String> offsets = entry.getValue().stream() + .filter(lastProcessedOffsets::containsKey) + .collect(Collectors.toMap(Function.identity(), lastProcessedOffsets::get)); + + try { + String fileContents = OBJECT_MAPPER.writeValueAsString(offsets); + File offsetFile = new File(getStoreLocation(storeName), OFFSET_FILE); + FileUtil.writeWithChecksum(offsetFile, fileContents); + } catch (Exception e) { + throw new SamzaException("Failed to write offset file for side input store: " + storeName, e); + } + }); + } + + /** + * Gets the side input SSP offsets for all stores from their local offset files. + * + * @return a {@link Map} of {@link SystemStreamPartition} to offset in the offset files. + */ + @SuppressWarnings("unchecked") + private Map<SystemStreamPartition, String> getFileOffsets() { + LOG.info("Loading initial offsets from the file for side input stores."); + Map<SystemStreamPartition, String> fileOffsets = new HashMap<>(); + + stores.keySet().forEach(storeName -> { + LOG.debug("Reading local offsets for store: {}", storeName); + + File storeLocation = getStoreLocation(storeName); + if (isValidSideInputStore(storeName, storeLocation)) { + try { + String fileContents = StorageManagerUtil.readOffsetFile(storeLocation, OFFSET_FILE); + Map<SystemStreamPartition, String> offsets = OBJECT_MAPPER.readValue(fileContents, Map.class); + fileOffsets.putAll(offsets); + } catch (Exception e) { + LOG.warn("Failed to load the offset file for side input store:" + storeName, e); + } + } + }); + + return fileOffsets; + } + + private File getStoreLocation(String storeName) { + return new File(storeBaseDir, (storeName + File.separator + taskName.toString()).replace(' ', '_')); + } + + /** + * Gets the starting offsets for the {@link SystemStreamPartition}s belonging to all the side input stores. + * If the local file offset is available and is greater than the oldest available offset from source, uses it, + * else falls back to oldest offset in the source. + * + * @param fileOffsets offsets from the local offset file + * @param oldestOffsets oldest offsets from the source + * @return a {@link Map} of {@link SystemStreamPartition} to offset + */ + private Map<SystemStreamPartition, String> getStartingOffsets( + Map<SystemStreamPartition, String> fileOffsets, Map<SystemStreamPartition, String> oldestOffsets) { + Map<SystemStreamPartition, String> startingOffsets = new HashMap<>(); + + sspsToStores.keySet().forEach(ssp -> { + String fileOffset = fileOffsets.get(ssp); + String oldestOffset = oldestOffsets.get(ssp); + + startingOffsets.put(ssp, + StorageManagerUtil.getStartingOffset( + ssp, systemAdmins.getSystemAdmin(ssp.getSystem()), fileOffset, oldestOffset)); + }); + + return startingOffsets; + } + + /** + * Gets the oldest offset for the {@link SystemStreamPartition}s associated with all the store side inputs. + * 1. Groups the list of the SSPs based on system stream + * 2. Fetches the {@link SystemStreamMetadata} from {@link StreamMetadataCache} + * 3. Fetches the partition metadata for each system stream and fetch the corresponding partition metadata + * and populates the oldest offset for SSPs belonging to the system stream. + * + * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset. + */ + private Map<SystemStreamPartition, String> getOldestOffsets() { + Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>(); + + // Step 1 + Map<SystemStream, List<SystemStreamPartition>> systemStreamToSsp = sspsToStores.keySet().stream() + .collect(Collectors.groupingBy(SystemStreamPartition::getSystemStream)); + + // Step 2 + Map<SystemStream, SystemStreamMetadata> metadata = JavaConverters.mapAsJavaMapConverter( + streamMetadataCache.getStreamMetadata( + JavaConverters.asScalaSetConverter(systemStreamToSsp.keySet()).asScala().toSet(), false)).asJava(); + + // Step 3 + metadata.forEach((systemStream, systemStreamMetadata) -> { + // get the partition metadata for each system stream + Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = + systemStreamMetadata.getSystemStreamPartitionMetadata(); + + // For SSPs belonging to the system stream, use the partition metadata to get the oldest offset + Map<SystemStreamPartition, String> offsets = systemStreamToSsp.get(systemStream).stream() + .collect( + Collectors.toMap(Function.identity(), ssp -> partitionMetadata.get(ssp.getPartition()).getOldestOffset())); + + oldestOffsets.putAll(offsets); + }); + + return oldestOffsets; + } + + private boolean isValidSideInputStore(String storeName, File storeLocation) { + return isPersistedStore(storeName) + && !StorageManagerUtil.isStaleStore(storeLocation, OFFSET_FILE, STORE_DELETE_RETENTION_MS, clock.currentTimeMillis()) + && StorageManagerUtil.isOffsetFileValid(storeLocation, OFFSET_FILE); + } + + private boolean isPersistedStore(String storeName) { + return Optional.ofNullable(stores.get(storeName)) + .map(StorageEngine::getStoreProperties) + .map(StoreProperties::isPersistedToDisk) + .orElse(false); + } + + private void validateStoreConfiguration() { + stores.forEach((storeName, storageEngine) -> { + if (StringUtils.isBlank(storageConfig.getSideInputsProcessorFactory(storeName))) { + throw new SamzaException( + String.format("Side inputs processor factory configuration missing for store: %s.", storeName)); + } + + if (storageEngine.getStoreProperties().isLoggedStore()) { + throw new SamzaException( + String.format("Cannot configure both side inputs and a changelog for store: %s.", storeName)); + } + }); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala index e4ee767..c9df3b5 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala @@ -76,6 +76,18 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging conf.asScala.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq } + def getSideInputs(storeName: String): Seq[String] = { + new JavaStorageConfig(config).getSideInputs(storeName).asScala + } + + def getSideInputsProcessorFactory(storeName: String): Option[String] = { + Option(new JavaStorageConfig(config).getSideInputsProcessorFactory(storeName)) + } + + def getSideInputsProcessorSerializedInstance(storeName: String): Option[String] = { + Option(new JavaStorageConfig(config).getSideInputsProcessorSerializedInstance(storeName)) + } + /** * Build a map of storeName to changeLogDeleteRetention for all of the stores. * @return a map from storeName to the changeLogDeleteRetention of the store in ms. http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala index 0a4623e..298c8ca 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala @@ -42,8 +42,8 @@ object StreamConfig { // We don't want any external dependencies on these patterns while both exist. Use getProperty to ensure proper values. private val STREAMS_PREFIX = "streams." - private val STREAM_PREFIX = "systems.%s.streams.%s." + val STREAM_PREFIX = "systems.%s.streams.%s." val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s." val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 89278ad..35802ac 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -30,7 +30,6 @@ import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorServic import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics @@ -48,10 +47,11 @@ import org.apache.samza.job.model.{ContainerModel, JobModel} import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter} import org.apache.samza.serializers._ import org.apache.samza.serializers.model.SamzaObjectMapper -import org.apache.samza.storage.{StorageEngineFactory, TaskStorageManager} +import org.apache.samza.storage._ import org.apache.samza.system._ import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory} import org.apache.samza.table.TableManager +import org.apache.samza.table.utils.SerdeUtils import org.apache.samza.task._ import org.apache.samza.util.Util import org.apache.samza.util._ @@ -354,6 +354,14 @@ object SamzaContainer extends Logging { info("Got intermediate streams: %s" format intermediateStreams) + val sideInputStoresToSystemStreams = config.getStoreNames + .map { storeName => (storeName, config.getSideInputs(storeName)) } + .filter { case (storeName, sideInputs) => sideInputs.nonEmpty } + .map { case (storeName, sideInputs) => (storeName, sideInputs.map(Util.getSystemStreamFromNameOrId(config, _))) } + .toMap + + info("Got side input store system streams: %s" format sideInputStoresToSystemStreams) + val controlMessageKeySerdes = intermediateStreams .flatMap(streamId => { val systemStream = config.streamIdToSystemStream(streamId) @@ -531,7 +539,7 @@ object SamzaContainer extends Logging { val nonLoggedStorageBaseDir = getNonLoggedStorageBaseDir(config, defaultStoreBaseDir) info("Got base directory for non logged data stores: %s" format nonLoggedStorageBaseDir) - var loggedStorageBaseDir = getLoggedStorageBaseDir(config, defaultStoreBaseDir) + val loggedStorageBaseDir = getLoggedStorageBaseDir(config, defaultStoreBaseDir) info("Got base directory for logged data stores: %s" format loggedStorageBaseDir) val taskStores = storageEngineFactories @@ -577,9 +585,32 @@ object SamzaContainer extends Logging { info("Got task stores: %s" format taskStores) + val taskSSPs = taskModel.getSystemStreamPartitions.asScala.toSet + info("Got task SSPs: %s" format taskSSPs) + + val (sideInputStores, nonSideInputStores) = + taskStores.partition { case (storeName, _) => sideInputStoresToSystemStreams.contains(storeName)} + + val sideInputStoresToSSPs = sideInputStoresToSystemStreams.mapValues(sideInputSystemStreams => + taskSSPs.filter(ssp => sideInputSystemStreams.contains(ssp.getSystemStream)).asJava) + + val taskSideInputSSPs = sideInputStoresToSSPs.values.flatMap(_.asScala).toSet + + info ("Got task side input SSPs: %s" format taskSideInputSSPs) + + val sideInputStoresToProcessor = sideInputStores.keys.map(storeName => { + // serialized instances takes precedence over the factory configuration. + config.getSideInputsProcessorSerializedInstance(storeName).map(serializedInstance => + (storeName, SerdeUtils.deserialize("Side Inputs Processor", serializedInstance))) + .orElse(config.getSideInputsProcessorFactory(storeName).map(factoryClassName => + (storeName, Util.getObj(factoryClassName, classOf[SideInputsProcessorFactory]) + .getSideInputsProcessor(config, taskInstanceMetrics.registry)))) + .get + }).toMap + val storageManager = new TaskStorageManager( taskName = taskName, - taskStores = taskStores, + taskStores = nonSideInputStores, storeConsumers = storeConsumers, changeLogSystemStreams = changeLogSystemStreams, maxChangeLogStreamPartitions, @@ -592,17 +623,24 @@ object SamzaContainer extends Logging { new StorageConfig(config).getChangeLogDeleteRetentionsInMs, new SystemClock) + var sideInputStorageManager: TaskSideInputStorageManager = null + if (sideInputStores.nonEmpty) { + sideInputStorageManager = new TaskSideInputStorageManager( + taskName, + streamMetadataCache, + loggedStorageBaseDir.getPath, + sideInputStores.asJava, + sideInputStoresToProcessor.asJava, + sideInputStoresToSSPs.asJava, + systemAdmins, + config, + new SystemClock) + } + val tableManager = new TableManager(config, serdes.asJava) info("Got table manager") - val systemStreamPartitions = taskModel - .getSystemStreamPartitions - .asScala - .toSet - - info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " for " + taskName) - def createTaskInstance(task: Any): TaskInstance = new TaskInstance( task = task, taskName = taskName, @@ -616,11 +654,13 @@ object SamzaContainer extends Logging { storageManager = storageManager, tableManager = tableManager, reporters = reporters, - systemStreamPartitions = systemStreamPartitions, + systemStreamPartitions = taskSSPs, exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config), jobModel = jobModel, streamMetadataCache = streamMetadataCache, - timerExecutor = timerExecutor) + timerExecutor = timerExecutor, + sideInputSSPs = taskSideInputSSPs, + sideInputStorageManager = sideInputStorageManager) val taskInstance = createTaskInstance(task) http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 64ee7f3..0caca4f 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -28,11 +28,12 @@ import org.apache.samza.config.Config import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.job.model.JobModel import org.apache.samza.metrics.MetricsReporter -import org.apache.samza.storage.TaskStorageManager +import org.apache.samza.storage.kv.KeyValueStore +import org.apache.samza.storage.{TaskSideInputStorageManager, TaskStorageManager} import org.apache.samza.system._ import org.apache.samza.table.TableManager import org.apache.samza.task._ -import org.apache.samza.util.Logging +import org.apache.samza.util.{Logging, ScalaJavaUtil} import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ @@ -55,15 +56,29 @@ class TaskInstance( val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler, jobModel: JobModel = null, streamMetadataCache: StreamMetadataCache = null, - timerExecutor : ScheduledExecutorService = null) extends Logging { + timerExecutor : ScheduledExecutorService = null, + sideInputSSPs: Set[SystemStreamPartition] = Set(), + sideInputStorageManager: TaskSideInputStorageManager = null) extends Logging { + val isInitableTask = task.isInstanceOf[InitableTask] val isWindowableTask = task.isInstanceOf[WindowableTask] val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask] val isClosableTask = task.isInstanceOf[ClosableTask] val isAsyncTask = task.isInstanceOf[AsyncStreamTask] + val kvStoreSupplier = ScalaJavaUtil.toJavaFunction( + (storeName: String) => { + if (storageManager != null && storageManager.getStore(storeName).isDefined) { + storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, _]] + } else if (sideInputStorageManager != null && sideInputStorageManager.getStore(storeName) != null) { + sideInputStorageManager.getStore(storeName).asInstanceOf[KeyValueStore[_, _]] + } else { + null + } + }) + val context = new TaskContextImpl(taskName, metrics, containerContext, systemStreamPartitions.asJava, offsetManager, - storageManager, tableManager, jobModel, streamMetadataCache, timerExecutor) + kvStoreSupplier, tableManager, jobModel, streamMetadataCache, timerExecutor) // store the (ssp -> if this ssp is catched up) mapping. "catched up" // means the same ssp in other taskInstances have the same offset as @@ -85,7 +100,8 @@ class TaskInstance( def registerOffsets { debug("Registering offsets for taskName: %s" format taskName) - offsetManager.register(taskName, systemStreamPartitions) + val sspsToRegister = systemStreamPartitions -- sideInputSSPs + offsetManager.register(taskName, sspsToRegister) } def startStores { @@ -96,6 +112,13 @@ class TaskInstance( } else { debug("Skipping storage manager initialization for taskName: %s" format taskName) } + + if (sideInputStorageManager != null) { + debug("Starting side input storage manager for taskName: %s" format taskName) + sideInputStorageManager.init() + } else { + debug("Skipping side input storage manager initialization for taskName: %s" format taskName) + } } def startTableManager { @@ -128,14 +151,14 @@ class TaskInstance( debug("Registering consumers for taskName: %s" format taskName) systemStreamPartitions.foreach(systemStreamPartition => { - val offset = offsetManager.getStartingOffset(taskName, systemStreamPartition) - .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition)) - consumerMultiplexer.register(systemStreamPartition, offset) - metrics.addOffsetGauge(systemStreamPartition, () => { - offsetManager - .getLastProcessedOffset(taskName, systemStreamPartition) - .orNull - }) + val startingOffset = getStartingOffset(systemStreamPartition) + consumerMultiplexer.register(systemStreamPartition, startingOffset) + metrics.addOffsetGauge(systemStreamPartition, () => + if (sideInputSSPs.contains(systemStreamPartition)) { + sideInputStorageManager.getLastProcessedOffset(systemStreamPartition) + } else { + offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).orNull + }) }) } @@ -143,31 +166,37 @@ class TaskInstance( callbackFactory: TaskCallbackFactory = null) { metrics.processes.inc - if (!ssp2CaughtupMapping.getOrElse(envelope.getSystemStreamPartition, - throw new SamzaException(envelope.getSystemStreamPartition + " is not registered!"))) { + val incomingMessageSsp = envelope.getSystemStreamPartition + + if (!ssp2CaughtupMapping.getOrElse(incomingMessageSsp, + throw new SamzaException(incomingMessageSsp + " is not registered!"))) { checkCaughtUp(envelope) } - if (ssp2CaughtupMapping(envelope.getSystemStreamPartition)) { + if (ssp2CaughtupMapping(incomingMessageSsp)) { metrics.messagesActuallyProcessed.inc trace("Processing incoming message envelope for taskName and SSP: %s, %s" - format (taskName, envelope.getSystemStreamPartition)) + format (taskName, incomingMessageSsp)) - if (isAsyncTask) { - exceptionHandler.maybeHandle { - val callback = callbackFactory.createCallback() - task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback) - } + if (sideInputSSPs.contains(incomingMessageSsp)) { + sideInputStorageManager.process(envelope) } else { - exceptionHandler.maybeHandle { - task.asInstanceOf[StreamTask].process(envelope, collector, coordinator) - } + if (isAsyncTask) { + exceptionHandler.maybeHandle { + val callback = callbackFactory.createCallback() + task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback) + } + } else { + exceptionHandler.maybeHandle { + task.asInstanceOf[StreamTask].process(envelope, collector, coordinator) + } - trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" - format (taskName, envelope.getSystemStreamPartition, envelope.getOffset)) + trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" + format (taskName, incomingMessageSsp, envelope.getOffset)) - offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset) + offsetManager.update(taskName, incomingMessageSsp, envelope.getOffset) + } } } } @@ -217,8 +246,12 @@ class TaskInstance( storageManager.flush } - trace("Checkpointing offsets for taskName: %s" format taskName) + trace("Flushing side input stores for taskName: %s" format taskName) + if (sideInputStorageManager != null) { + sideInputStorageManager.flush() + } + trace("Checkpointing offsets for taskName: %s" format taskName) offsetManager.writeCheckpoint(taskName, checkpoint) if (checkpoint != null) { @@ -249,6 +282,13 @@ class TaskInstance( } else { debug("Skipping storage manager shutdown for taskName: %s" format taskName) } + + if (sideInputStorageManager != null) { + debug("Shutting down side input storage manager for taskName: %s" format taskName) + sideInputStorageManager.stop() + } else { + debug("Skipping side input storage manager shutdown for taskName: %s" format taskName) + } } def shutdownTableManager { @@ -272,27 +312,29 @@ class TaskInstance( * it's already catched-up. */ private def checkCaughtUp(envelope: IncomingMessageEnvelope) = { + val incomingMessageSsp = envelope.getSystemStreamPartition + if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) { - ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true + ssp2CaughtupMapping(incomingMessageSsp) = true } else { systemAdmins match { case null => { warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up") - ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true + ssp2CaughtupMapping(incomingMessageSsp) = true } case others => { - val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition) - .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition)) - val system = envelope.getSystemStreamPartition.getSystem + val startingOffset = getStartingOffset(incomingMessageSsp) + + val system = incomingMessageSsp.getSystem others.getSystemAdmin(system).offsetComparator(envelope.getOffset, startingOffset) match { case null => { info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up") - ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable + ssp2CaughtupMapping(incomingMessageSsp) = true // not comparable } case result => { if (result >= 0) { - info(envelope.getSystemStreamPartition.toString + " is catched up.") - ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true + info(incomingMessageSsp.toString + " is catched up.") + ssp2CaughtupMapping(incomingMessageSsp) = true } } } @@ -300,4 +342,18 @@ class TaskInstance( } } } + + private def getStartingOffset(systemStreamPartition: SystemStreamPartition) = { + val offset = + if (sideInputSSPs.contains(systemStreamPartition)) { + Option(sideInputStorageManager.getStartingOffset(systemStreamPartition)) + } else { + offsetManager.getStartingOffset(taskName, systemStreamPartition) + } + + val startingOffset = offset.getOrElse( + throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition)) + + startingOffset + } } http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index 62b59fb..90fdc19 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -28,8 +28,6 @@ import org.apache.samza.container.TaskName import org.apache.samza.system._ import org.apache.samza.util.{Clock, FileUtil, Logging} -import scala.collection.JavaConverters._ - object TaskStorageManager { def getStoreDir(storeBaseDir: File, storeName: String) = { new File(storeBaseDir, storeName) @@ -70,7 +68,7 @@ class TaskStorageManager( val fileOffsets: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]() val offsetFileName = "OFFSET" - def apply(storageEngineName: String) = taskStores(storageEngineName) + def getStore(storeName: String): Option[StorageEngine] = taskStores.get(storeName) def init { cleanBaseDirs() @@ -101,7 +99,7 @@ class TaskStorageManager( info("Deleting logged storage partition directory %s." format loggedStorePartitionDir.toPath.toString) FileUtil.rm(loggedStorePartitionDir) } else { - val offset = readOffsetFile(loggedStorePartitionDir) + val offset = StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, offsetFileName) info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStorePartitionDir)) if (offset != null) { fileOffsets.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset) @@ -111,7 +109,7 @@ class TaskStorageManager( } /** - * Directory {@code loggedStoreDir} associated with the logged store {@code storeName} is valid, + * Directory loggedStoreDir associated with the logged store storeName is valid * if all of the following conditions are true. * a) If the store has to be persisted to disk. * b) If there is a valid offset file associated with the logged store. @@ -120,55 +118,12 @@ class TaskStorageManager( * @return true if the logged store is valid, false otherwise. */ private def isLoggedStoreValid(storeName: String, loggedStoreDir: File): Boolean = { - val changeLogDeleteRetentionInMs = changeLogDeleteRetentionsInMs.getOrElse(storeName, - StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS) - persistedStores.contains(storeName) && isOffsetFileValid(loggedStoreDir) && - !isStaleLoggedStore(loggedStoreDir, changeLogDeleteRetentionInMs) - } + val changeLogDeleteRetentionInMs = changeLogDeleteRetentionsInMs + .getOrElse(storeName, StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS) - /** - * Determines if the logged store directory {@code loggedStoreDir} is stale. A store is stale if the following condition is true. - * - * ((CurrentTime) - (LastModifiedTime of the Offset file) is greater than the changelog's tombstone retention). - * - * @param loggedStoreDir the base directory of the local change-logged store. - * @param changeLogDeleteRetentionInMs the delete retention of the changelog in milli seconds. - * @return true if the store is stale, false otherwise. - * - */ - private def isStaleLoggedStore(loggedStoreDir: File, changeLogDeleteRetentionInMs: Long): Boolean = { - var isStaleStore = false - val storePath = loggedStoreDir.toPath.toString - if (loggedStoreDir.exists()) { - val offsetFileRef = new File(loggedStoreDir, offsetFileName) - val offsetFileLastModifiedTime = offsetFileRef.lastModified() - if ((clock.currentTimeMillis() - offsetFileLastModifiedTime) >= changeLogDeleteRetentionInMs) { - info ("Store: %s is stale since lastModifiedTime of offset file: %s, " + - "is older than changelog deleteRetentionMs: %s." format(storePath, offsetFileLastModifiedTime, changeLogDeleteRetentionInMs)) - isStaleStore = true - } - } else { - info("Logged storage partition directory: %s does not exist." format storePath) - } - isStaleStore - } - - /** - * An offset file associated with logged store {@code loggedStoreDir} is valid if it exists and is not empty. - * - * @return true if the offset file is valid. false otherwise. - */ - private def isOffsetFileValid(loggedStoreDir: File): Boolean = { - var hasValidOffsetFile = false - if (loggedStoreDir.exists()) { - val offsetContents = readOffsetFile(loggedStoreDir) - if (offsetContents != null && !offsetContents.isEmpty) { - hasValidOffsetFile = true - } else { - info("Offset file is not valid for store: %s." format loggedStoreDir.toPath.toString) - } - } - hasValidOffsetFile + persistedStores.contains(storeName) && + StorageManagerUtil.isOffsetFileValid(loggedStoreDir, offsetFileName) && + !StorageManagerUtil.isStaleStore(loggedStoreDir, offsetFileName, changeLogDeleteRetentionInMs, clock.currentTimeMillis()) } private def setupBaseDirs() { @@ -187,24 +142,6 @@ class TaskStorageManager( } } - /** - * Read and return the contents of the offset file. - * - * @param loggedStoragePartitionDir the base directory of the store - * @return the content of the offset file if it exists for the store, null otherwise. - */ - private def readOffsetFile(loggedStoragePartitionDir: File): String = { - var offset : String = null - val offsetFileRef = new File(loggedStoragePartitionDir, offsetFileName) - if (offsetFileRef.exists()) { - info("Found offset file in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString) - offset = FileUtil.readWithChecksum(offsetFileRef) - } else { - info("No offset file found in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString) - } - offset - } - private def validateChangelogStreams() = { info("Validating change log streams: " + changeLogSystemStreams) @@ -262,22 +199,7 @@ class TaskStorageManager( .getOrElse(systemStreamPartition.getSystemStream, throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition)) - if (fileOffset != null) { - // File offset was the last message written to the changelog that is also reflected in the store, - // so we start with the NEXT offset - val resumeOffset = admin.getOffsetsAfter(Map(systemStreamPartition -> fileOffset).asJava).get(systemStreamPartition) - if (admin.offsetComparator(oldestOffset, resumeOffset) <= 0) { - resumeOffset - } else { - // If the offset we plan to use is older than the oldest offset, just use the oldest offset. - // This can happen with changelogs configured with a TTL cleanup policy - warn(s"Local store offset $resumeOffset is lower than the oldest offset $oldestOffset of the changelog. " + - s"The values between these offsets cannot be restored.") - oldestOffset - } - } else { - oldestOffset - } + StorageManagerUtil.getStartingOffset(systemStreamPartition, admin, fileOffset, oldestOffset) } private def restoreStores() { http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala index 4b93543..46a2089 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala @@ -33,7 +33,7 @@ object FileUtil { * @param file The file handle to write to * @param data The data to be written to the file * */ - def writeWithChecksum(file: File, data: String) = { + def writeWithChecksum(file: File, data: String): Unit = { val checksum = getChecksum(data) var oos: ObjectOutputStream = null var fos: FileOutputStream = null @@ -52,7 +52,7 @@ object FileUtil { * Reads from a file that has a checksum prepended to the data * @param file The file handle to read from * */ - def readWithChecksum(file: File) = { + def readWithChecksum(file: File): String = { var fis: FileInputStream = null var ois: ObjectInputStream = null try { @@ -76,7 +76,7 @@ object FileUtil { * Recursively remove a directory (or file), and all sub-directories. Equivalent * to rm -rf. */ - def rm(file: File) { + def rm(file: File): Unit = { if (file == null) { return } else if (file.isDirectory) { @@ -96,7 +96,7 @@ object FileUtil { * @param data The string for which checksum has to be generated * @return long type value representing the checksum * */ - def getChecksum(data: String) = { + def getChecksum(data: String): Long = { val crc = new CRC32 crc.update(data.getBytes) crc.getValue http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala index f3ba746..a359cd5 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala @@ -21,6 +21,8 @@ package org.apache.samza.util +import java.util.function + import scala.collection.immutable.Map import scala.collection.JavaConverters._ import scala.runtime.AbstractFunction0 @@ -47,6 +49,12 @@ object ScalaJavaUtil { } } + def toJavaFunction[T, R](scalaFunction: Function1[T, R]): java.util.function.Function[T, R] = { + new function.Function[T, R] { + override def apply(t: T): R = scalaFunction.apply(t) + } + } + /** * Wraps the provided Java Supplier in an Scala Function, e.g. for use in [[Option#getOrDefault]] * @@ -59,4 +67,8 @@ object ScalaJavaUtil { override def apply(): T = javaFunction.get() } } + + def toScalaFunction[T, R](javaFunction: java.util.function.Function[T, R]): Function1[T, R] = { + t => javaFunction.apply(t) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 059eb03..c9534bc 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -87,6 +87,26 @@ object Util extends Logging { } /** + * Gets the [[SystemStream]] corresponding to the provided stream, which may be + * a streamId, or stream name of the format systemName.streamName. + * + * @param stream the stream name or id to get the { @link SystemStream} for. + * @return the [[SystemStream]] for the stream + */ + def getSystemStreamFromNameOrId(config: Config, stream: String): SystemStream = { + val parts = stream.split(".") + if (parts.length == 0 || parts.length > 2) { + throw new SamzaException( + String.format("Invalid stream %s. Expected to be of the format streamId or systemName.streamName", stream)) + } + if (parts.length == 1) { + new StreamConfig(config).streamIdToSystemStream(stream) + } else { + new SystemStream(parts(0), parts(1)) + } + } + + /** * Returns the the first host address which is not the loopback address, or [[java.net.InetAddress#getLocalHost]] as a fallback * * @return the [[java.net.InetAddress]] which represents the localhost http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index 88767f5..9cdbfe6 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -82,7 +82,7 @@ public class TestAsyncRunLoop { return new TaskInstance(task, taskName, mock(Config.class), taskInstanceMetrics, null, consumers, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class), manager, null, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, - new scala.collection.immutable.HashSet<String>()), null, null, null); + new scala.collection.immutable.HashSet<String>()), null, null, null, new scala.collection.immutable.HashSet<>(), null); } interface TestCode { http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 4ff7848..1672191 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -47,7 +47,6 @@ import org.mockito.Mockito import org.mockito.Mockito._ import org.scalatest.Assertions.intercept -import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import scala.collection.JavaConverters._ @@ -377,7 +376,7 @@ class TestTaskInstance { val mockOrder = inOrder(offsetManager, collector, storageManager) val taskInstance: TaskInstance = new TaskInstance( - Mockito.mock(classOf[StreamTask]).asInstanceOf[StreamTask], + Mockito.mock(classOf[StreamTask]), taskName, new MapConfig, new TaskInstanceMetrics, @@ -418,7 +417,7 @@ class TestTaskInstance { val offsetManager = Mockito.mock(classOf[OffsetManager]) val taskInstance: TaskInstance = new TaskInstance( - Mockito.mock(classOf[StreamTask]).asInstanceOf[StreamTask], + Mockito.mock(classOf[StreamTask]), taskName, new MapConfig, new TaskInstanceMetrics, http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala index 3bb4e99..0b945cb 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala @@ -151,7 +151,7 @@ class TestTaskStorageManager extends MockitoSugar { val ss = new SystemStream("kafka", "testStream") val partition = new Partition(0) val ssp = new SystemStreamPartition(ss, partition) - val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName) + val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName) val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = false, null) @@ -182,12 +182,12 @@ class TestTaskStorageManager extends MockitoSugar { taskManager.init // Verify that the store directory doesn't have ANY files - assertNull(storeDirectory.listFiles()) + assertTrue(storeDirectory.list().isEmpty) verify(mockSystemConsumer).register(ssp, "0") // Test 2: flush should NOT create/update the offset file. Store directory has no files taskManager.flush() - assertNull(storeDirectory.listFiles()) + assertTrue(storeDirectory.list().isEmpty) // Test 3: Update sspMetadata before shutdown and verify that offset file is NOT created metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { @@ -197,7 +197,7 @@ class TestTaskStorageManager extends MockitoSugar { }) when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) taskManager.stop() - assertNull(storeDirectory.listFiles()) + assertTrue(storeDirectory.list().isEmpty) // Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the earliest offset metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { @@ -209,7 +209,7 @@ class TestTaskStorageManager extends MockitoSugar { taskManager.init - assertNull(storeDirectory.listFiles()) + assertTrue(storeDirectory.list().isEmpty) // second time to register; make sure it starts from beginning verify(mockSystemConsumer, times(2)).register(ssp, "0") } @@ -223,7 +223,7 @@ class TestTaskStorageManager extends MockitoSugar { val taskStorageManager = new TaskStorageManagerBuilder() .addStore(store, false) - .addStore(loggedStore, true) + .addLoggedStore(loggedStore, true) .build //Invoke test method @@ -244,7 +244,7 @@ class TestTaskStorageManager extends MockitoSugar { FileUtil.writeWithChecksum(offsetFilePath, "100") val taskStorageManager = new TaskStorageManagerBuilder() - .addStore(loggedStore, true) + .addLoggedStore(loggedStore, true) .build val cleanDirMethod = taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs", @@ -266,7 +266,7 @@ class TestTaskStorageManager extends MockitoSugar { FileUtil.writeWithChecksum(offsetFile, "Test Offset Data") offsetFile.setLastModified(0) val taskStorageManager = new TaskStorageManagerBuilder().addStore(store, false) - .addStore(loggedStore, true) + .addLoggedStore(loggedStore, true) .build val cleanDirMethod = taskStorageManager.getClass @@ -285,7 +285,7 @@ class TestTaskStorageManager extends MockitoSugar { FileUtil.writeWithChecksum(offsetFilePath, "100") val taskStorageManager = new TaskStorageManagerBuilder() - .addStore(loggedStore, false) + .addLoggedStore(loggedStore, false) .build val cleanDirMethod = taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs", @@ -647,6 +647,13 @@ class TaskStorageManagerBuilder extends MockitoSugar { addStore(storeName, mockStorageEngine, mock[SystemConsumer]) } + def addLoggedStore(storeName: String, isPersistedToDisk: Boolean): TaskStorageManagerBuilder = { + val mockStorageEngine = mock[StorageEngine] + when(mockStorageEngine.getStoreProperties) + .thenReturn(new StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(true).build()) + addStore(storeName, mockStorageEngine, mock[SystemConsumer]) + } + def setPartition(p: Partition) = { partition = p this http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java index 2681fb3..2a9532b 100644 --- a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java +++ b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java @@ -50,7 +50,8 @@ public class InMemoryTableDescriptor<K, V> extends BaseLocalStoreBackedTableDesc Map<String, String> tableSpecConfig = new HashMap<>(); generateTableSpecConfig(tableSpecConfig); - return new TableSpec(tableId, serde, InMemoryTableProviderFactory.class.getName(), tableSpecConfig); + return new TableSpec(tableId, serde, InMemoryTableProviderFactory.class.getName(), tableSpecConfig, + sideInputs, sideInputsProcessor); } private void addInMemoryConfig(Map<String, String> map, String key, String value) { http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java index 2c62159..33bfc84 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java @@ -182,7 +182,8 @@ public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescr Map<String, String> tableSpecConfig = new HashMap<>(); generateTableSpecConfig(tableSpecConfig); - return new TableSpec(tableId, serde, RocksDbTableProviderFactory.class.getName(), tableSpecConfig); + return new TableSpec(tableId, serde, RocksDbTableProviderFactory.class.getName(), tableSpecConfig, + sideInputs, sideInputsProcessor); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java index 1f9b57b..2d05f95 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java @@ -18,9 +18,13 @@ */ package org.apache.samza.storage.kv; +import com.google.common.base.Preconditions; + +import java.util.List; import java.util.Map; import org.apache.samza.operators.BaseTableDescriptor; +import org.apache.samza.storage.SideInputsProcessor; /** @@ -32,6 +36,8 @@ import org.apache.samza.operators.BaseTableDescriptor; */ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLocalStoreBackedTableDescriptor<K, V, D>> extends BaseTableDescriptor<K, V, D> { + protected List<String> sideInputs; + protected SideInputsProcessor sideInputsProcessor; /** * Constructs a table descriptor instance @@ -41,6 +47,16 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo super(tableId); } + public D withSideInputs(List<String> sideInputs) { + this.sideInputs = sideInputs; + return (D) this; + } + + public D withSideInputsProcessor(SideInputsProcessor sideInputsProcessor) { + this.sideInputsProcessor = sideInputsProcessor; + return (D) this; + } + @Override protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) { super.generateTableSpecConfig(tableSpecConfig); @@ -51,6 +67,11 @@ abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLo */ protected void validate() { super.validate(); + if (sideInputs != null || sideInputsProcessor != null) { + Preconditions.checkArgument(sideInputs != null && !sideInputs.isEmpty() && sideInputsProcessor != null, + String.format("Invalid side input configuration for table: %s. " + + "Both side inputs and the processor must be provided", tableId)); + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/fa56b15d/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java index b494eba..cacfe95 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java @@ -19,9 +19,11 @@ package org.apache.samza.storage.kv; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.samza.SamzaException; +import org.apache.samza.config.JavaStorageConfig; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StorageConfig; @@ -30,6 +32,7 @@ import org.apache.samza.table.ReadableTable; import org.apache.samza.table.Table; import org.apache.samza.table.TableProvider; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.utils.SerdeUtils; import org.apache.samza.task.TaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +105,15 @@ abstract public class BaseLocalStoreBackedTableProvider implements TableProvider String valueSerde = tableConfig.getValueSerde(tableSpec.getId()); storeConfig.put(String.format(StorageConfig.MSG_SERDE(), tableSpec.getId()), valueSerde); + List<String> sideInputs = tableSpec.getSideInputs(); + if (sideInputs != null && !sideInputs.isEmpty()) { + String formattedSideInputs = String.join(",", sideInputs); + + storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS, tableSpec.getId()), formattedSideInputs); + storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, tableSpec.getId()), + SerdeUtils.serialize("Side Inputs Processor", tableSpec.getSideInputsProcessor())); + } + return storeConfig; }
