bkonold commented on a change in pull request #1367: URL: https://github.com/apache/samza/pull/1367#discussion_r435079989
########## File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + + +/** + * This class encapsulates all processing logic / state for all side input SSPs within a task. + */ +public class TaskSideInputHandler { + private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class); + + private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil(); + + private final TaskName taskName; Review comment: yeah, that was the intent in which case i should move `lastProcessedOffsets` as well. unless you have any objections ########## File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + + +/** + * This class encapsulates all processing logic / state for all side input SSPs within a task. + */ +public class TaskSideInputHandler { + private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class); + + private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil(); + + private final TaskName taskName; + private final TaskSideInputStorageManager taskSideInputStorageManager; + private final Map<SystemStreamPartition, Set<String>> sspToStores; + private final Map<String, SideInputsProcessor> storeToProcessor; + private final SystemAdmins systemAdmins; + private final StreamMetadataCache streamMetadataCache; + private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>(); + + private Map<SystemStreamPartition, String> startingOffsets; + + public TaskSideInputHandler( + TaskName taskName, + TaskMode taskMode, + File storeBaseDir, + Map<String, StorageEngine> storeToStorageEngines, + Map<String, Set<SystemStreamPartition>> storeToSSPs, + Map<String, SideInputsProcessor> storeToProcessor, + SystemAdmins systemAdmins, + StreamMetadataCache streamMetadataCache, + Clock clock) { Review comment: that's fine. i think what probably happened was i did a "change signature" refactor with intellij that changed the formatting in the storage manager class. personally i prefer single line for readability, but don't have a strong opinion. i do think though that we should some sort of style pattern guidelines as part of open source docs - i don't see any there now. ########## File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + + +/** + * This class encapsulates all processing logic / state for all side input SSPs within a task. + */ +public class TaskSideInputHandler { + private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class); + + private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil(); + + private final TaskName taskName; + private final TaskSideInputStorageManager taskSideInputStorageManager; + private final Map<SystemStreamPartition, Set<String>> sspToStores; + private final Map<String, SideInputsProcessor> storeToProcessor; + private final SystemAdmins systemAdmins; + private final StreamMetadataCache streamMetadataCache; + private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>(); + + private Map<SystemStreamPartition, String> startingOffsets; + + public TaskSideInputHandler( + TaskName taskName, + TaskMode taskMode, + File storeBaseDir, + Map<String, StorageEngine> storeToStorageEngines, + Map<String, Set<SystemStreamPartition>> storeToSSPs, + Map<String, SideInputsProcessor> storeToProcessor, + SystemAdmins systemAdmins, + StreamMetadataCache streamMetadataCache, + Clock clock) { + this.taskName = taskName; + this.systemAdmins = systemAdmins; + this.streamMetadataCache = streamMetadataCache; + this.storeToProcessor = storeToProcessor; + + this.sspToStores = storeToSSPs.entrySet().stream() + .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream() + .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey()))) + .collect(Collectors.groupingBy( + Map.Entry::getKey, + Collectors.mapping(Map.Entry::getValue, Collectors.toSet()))); + + this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName, + taskMode, + storeBaseDir, + storeToStorageEngines, + storeToSSPs, + clock); + + validateProcessorConfiguration(); Review comment: agree, i will do the same in `TaskSideInputStorageManager` as well since that validation also happens at the end of the constructor and doesn't need to. ########## File path: samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.samza.Partition; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.apache.samza.util.ScalaJavaUtil; +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; Review comment: fwiw i see a lot of usage of both styles, * static import and per method will change ########## File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java ########## @@ -279,13 +164,12 @@ void writeOffsetFiles() { } /** - * Gets the side input SSP offsets for all stores from their local offset files. + * Gets the side input SSP offsets for all stores from their local offset files. This method should be executed only + * once at class initialization. Review comment: ah, i think i should actually remove this. there is no problem calling multiple times. at some point when writing this refactor i was unsure what the contract of this would be with `TaskSideInputHandler` when it came to initialization and wrote this comment. but all this does is read the offset files - caller can do as they please. ########## File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ########## @@ -329,18 +345,14 @@ public ContainerStorageManager( /** * Creates SystemConsumer objects for store restoration, creating one consumer per system. */ - private static Map<String, SystemConsumer> createConsumers(Map<String, Set<SystemStream>> systemStreams, + private static Map<String, SystemConsumer> createConsumers(Set<String> systems, Review comment: sure, i'll keep storeSystems ########## File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + + +/** + * This class encapsulates all processing logic / state for all side input SSPs within a task. + */ +public class TaskSideInputHandler { + private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class); + + private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil(); + + private final TaskName taskName; + private final TaskSideInputStorageManager taskSideInputStorageManager; + private final Map<SystemStreamPartition, Set<String>> sspToStores; + private final Map<String, SideInputsProcessor> storeToProcessor; + private final SystemAdmins systemAdmins; + private final StreamMetadataCache streamMetadataCache; + private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>(); + + private Map<SystemStreamPartition, String> startingOffsets; + + public TaskSideInputHandler( + TaskName taskName, + TaskMode taskMode, + File storeBaseDir, + Map<String, StorageEngine> storeToStorageEngines, + Map<String, Set<SystemStreamPartition>> storeToSSPs, + Map<String, SideInputsProcessor> storeToProcessor, + SystemAdmins systemAdmins, + StreamMetadataCache streamMetadataCache, + Clock clock) { + this.taskName = taskName; + this.systemAdmins = systemAdmins; + this.streamMetadataCache = streamMetadataCache; + this.storeToProcessor = storeToProcessor; + + this.sspToStores = storeToSSPs.entrySet().stream() + .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream() + .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey()))) + .collect(Collectors.groupingBy( + Map.Entry::getKey, + Collectors.mapping(Map.Entry::getValue, Collectors.toSet()))); + + this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName, + taskMode, + storeBaseDir, + storeToStorageEngines, + storeToSSPs, + clock); + + validateProcessorConfiguration(); + } + + /** + * The {@link TaskName} associated with this {@link TaskSideInputHandler} + * + * @return the task name for this handler + */ + public TaskName getTaskName() { + return this.taskName; + } + + /** + * Initializes the underlying {@link TaskSideInputStorageManager} and determines starting offsets for each SSP. + */ + public void init() { + this.taskSideInputStorageManager.init(); + + Map<SystemStreamPartition, String> fileOffsets = this.taskSideInputStorageManager.getFileOffsets(); + LOG.info("File offsets for the task {}: {}", taskName, fileOffsets); + + this.lastProcessedOffsets.putAll(fileOffsets); + LOG.info("Last processed offsets for the task {}: {}", taskName, lastProcessedOffsets); + + this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets()); + LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets); + } + + /** + * Processes the incoming side input message envelope and updates the last processed offset for its SSP. + * Synchronized inorder to be exclusive with flush(). + * + * @param envelope incoming envelope to be processed + */ + public synchronized void process(IncomingMessageEnvelope envelope) { Review comment: i was only preserving the existing synchronization so don't know full context, but it appears to be necessary since `flush` and `process` are currently invoked from different threads concurrently. seems to imply that the rational was so that `writeOffsetFiles` would write the same offset for SSPs shared between stores. so i don't think `lastProcessedOffsets` being thread-safe is enough. in fact, i don't think it needs to be thread-safe at all since `init` and all `process` calls are exclusive from one another. ########## File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + + +/** + * This class encapsulates all processing logic / state for all side input SSPs within a task. + */ +public class TaskSideInputHandler { + private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class); + + private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil(); + + private final TaskName taskName; + private final TaskSideInputStorageManager taskSideInputStorageManager; + private final Map<SystemStreamPartition, Set<String>> sspToStores; + private final Map<String, SideInputsProcessor> storeToProcessor; + private final SystemAdmins systemAdmins; + private final StreamMetadataCache streamMetadataCache; + private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>(); + + private Map<SystemStreamPartition, String> startingOffsets; + + public TaskSideInputHandler( + TaskName taskName, + TaskMode taskMode, + File storeBaseDir, + Map<String, StorageEngine> storeToStorageEngines, + Map<String, Set<SystemStreamPartition>> storeToSSPs, + Map<String, SideInputsProcessor> storeToProcessor, + SystemAdmins systemAdmins, + StreamMetadataCache streamMetadataCache, + Clock clock) { + this.taskName = taskName; + this.systemAdmins = systemAdmins; + this.streamMetadataCache = streamMetadataCache; + this.storeToProcessor = storeToProcessor; + + this.sspToStores = storeToSSPs.entrySet().stream() + .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream() + .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey()))) + .collect(Collectors.groupingBy( + Map.Entry::getKey, + Collectors.mapping(Map.Entry::getValue, Collectors.toSet()))); + + this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName, + taskMode, + storeBaseDir, + storeToStorageEngines, + storeToSSPs, + clock); + + validateProcessorConfiguration(); + } + + /** + * The {@link TaskName} associated with this {@link TaskSideInputHandler} + * + * @return the task name for this handler + */ + public TaskName getTaskName() { + return this.taskName; + } + + /** + * Initializes the underlying {@link TaskSideInputStorageManager} and determines starting offsets for each SSP. + */ + public void init() { + this.taskSideInputStorageManager.init(); + + Map<SystemStreamPartition, String> fileOffsets = this.taskSideInputStorageManager.getFileOffsets(); + LOG.info("File offsets for the task {}: {}", taskName, fileOffsets); + + this.lastProcessedOffsets.putAll(fileOffsets); + LOG.info("Last processed offsets for the task {}: {}", taskName, lastProcessedOffsets); + + this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets()); + LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets); + } + + /** + * Processes the incoming side input message envelope and updates the last processed offset for its SSP. + * Synchronized inorder to be exclusive with flush(). + * + * @param envelope incoming envelope to be processed + */ + public synchronized void process(IncomingMessageEnvelope envelope) { + SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition(); + String envelopeOffset = envelope.getOffset(); + + for (String store: this.sspToStores.get(envelopeSSP)) { + SideInputsProcessor storeProcessor = this.storeToProcessor.get(store); + KeyValueStore keyValueStore = (KeyValueStore) this.taskSideInputStorageManager.getStore(store); + Collection<Entry<?, ?>> entriesToBeWritten = storeProcessor.process(envelope, keyValueStore); + + // TODO: SAMZA-2255: optimize writes to side input stores + for (Entry entry : entriesToBeWritten) { + // If the key is null we ignore, if the value is null, we issue a delete, else we issue a put + if (entry.getKey() != null) { + if (entry.getValue() != null) { + keyValueStore.put(entry.getKey(), entry.getValue()); + } else { + keyValueStore.delete(entry.getKey()); + } + } + } + } + + this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset); + } + + /** + * Flushes the underlying {@link TaskSideInputStorageManager} + * Synchronized inorder to be exclusive with process() + */ + public synchronized void flush() { + this.taskSideInputStorageManager.flush(this.lastProcessedOffsets); + } + + /** + * Gets the starting offset for the given side input {@link SystemStreamPartition}. + * + * Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT} and + * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET} configurations. It will use the local offset + * file if it is valid, else it will fall back to oldest offset in the stream. + * + * @param ssp side input system stream partition to get the starting offset for + * @return the starting offset + */ + public String getStartingOffset(SystemStreamPartition ssp) { + return this.startingOffsets.get(ssp); + } + + /** + * Gets the last processed offset for the given side input {@link SystemStreamPartition}. + * + * @param ssp side input system stream partition to get the last processed offset for + * @return the last processed offset + */ + public String getLastProcessedOffset(SystemStreamPartition ssp) { + return this.lastProcessedOffsets.get(ssp); + } + + /** + * Stops the underlying storage manager at the last processed offsets. + */ + public void stop() { Review comment: i don't think `stop` requires synchronization? CSM will have ceased interaction with the instance before invoking `stop`, so there will be nothing else to synchronize between. though i should probably doc that precondition ########## File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + + +/** + * This class encapsulates all processing logic / state for all side input SSPs within a task. + */ +public class TaskSideInputHandler { + private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class); + + private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil(); + + private final TaskName taskName; + private final TaskSideInputStorageManager taskSideInputStorageManager; + private final Map<SystemStreamPartition, Set<String>> sspToStores; + private final Map<String, SideInputsProcessor> storeToProcessor; + private final SystemAdmins systemAdmins; + private final StreamMetadataCache streamMetadataCache; + private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>(); + + private Map<SystemStreamPartition, String> startingOffsets; + + public TaskSideInputHandler( + TaskName taskName, + TaskMode taskMode, + File storeBaseDir, + Map<String, StorageEngine> storeToStorageEngines, + Map<String, Set<SystemStreamPartition>> storeToSSPs, + Map<String, SideInputsProcessor> storeToProcessor, + SystemAdmins systemAdmins, + StreamMetadataCache streamMetadataCache, + Clock clock) { + this.taskName = taskName; + this.systemAdmins = systemAdmins; + this.streamMetadataCache = streamMetadataCache; + this.storeToProcessor = storeToProcessor; + + this.sspToStores = storeToSSPs.entrySet().stream() + .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream() + .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey()))) + .collect(Collectors.groupingBy( + Map.Entry::getKey, + Collectors.mapping(Map.Entry::getValue, Collectors.toSet()))); + Review comment: no strong feelings on this. generally i favor streams over imperative when it comes to transforms as i find them easier to conceptualize and less error-prone since they make any modification of the source explicit. fine reverting... but interested to hear any additional thoughts you have ########## File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmins; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; + + +/** + * This class encapsulates all processing logic / state for all side input SSPs within a task. + */ +public class TaskSideInputHandler { + private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class); + + private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil(); + + private final TaskName taskName; + private final TaskSideInputStorageManager taskSideInputStorageManager; + private final Map<SystemStreamPartition, Set<String>> sspToStores; + private final Map<String, SideInputsProcessor> storeToProcessor; + private final SystemAdmins systemAdmins; + private final StreamMetadataCache streamMetadataCache; + private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>(); + + private Map<SystemStreamPartition, String> startingOffsets; + + public TaskSideInputHandler( + TaskName taskName, + TaskMode taskMode, + File storeBaseDir, + Map<String, StorageEngine> storeToStorageEngines, + Map<String, Set<SystemStreamPartition>> storeToSSPs, + Map<String, SideInputsProcessor> storeToProcessor, + SystemAdmins systemAdmins, + StreamMetadataCache streamMetadataCache, + Clock clock) { + this.taskName = taskName; + this.systemAdmins = systemAdmins; + this.streamMetadataCache = streamMetadataCache; + this.storeToProcessor = storeToProcessor; + + this.sspToStores = storeToSSPs.entrySet().stream() + .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream() + .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey()))) + .collect(Collectors.groupingBy( + Map.Entry::getKey, + Collectors.mapping(Map.Entry::getValue, Collectors.toSet()))); + + this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName, + taskMode, + storeBaseDir, + storeToStorageEngines, + storeToSSPs, + clock); Review comment: ended up moving since CSM had otherwise no reason to keep track of the instance and i thought it better encapsulated lifecycle management (e.g. CSM can't mistakenly hold on to an instance that has been closed) i may actually end up moving this back out depending on particular impl details when i move side input processing onto `RunLoop`. i may end up sharing the storage manager instances across the handler class and a new class which implements `RunLoopTask` what are your thoughts for/against? if this is relatively inconsequential, i suggest we table this as it's likely to change again in a subsequent PR. ########## File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java ########## @@ -258,9 +142,10 @@ private void initializeStoreDirectories() { /** * Writes the offset files for all side input stores one by one. There is one offset file per store. * Its contents are a JSON encoded mapping from each side input SSP to its last processed offset, and a checksum. + * + * @param lastProcessedOffsets The offset per SSP to write */ - @VisibleForTesting - void writeOffsetFiles() { + public void writeOffsetFiles(Map<SystemStreamPartition, String> lastProcessedOffsets) { Review comment: sure, good suggestion. i'll change the name ########## File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ########## @@ -714,15 +729,15 @@ private void startSideInputs() { LOG.info("SideInput Restore started"); // initialize the sideInputStorageManagers - getSideInputStorageManagers().forEach(sideInputStorageManager -> sideInputStorageManager.init()); + getSideInputHandlers().forEach(handler -> handler.init()); Review comment: agree - left this alone initially to try to minimize changes. will use reference instead ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
