mynameborat commented on a change in pull request #1343:
URL: https://github.com/apache/samza/pull/1343#discussion_r407736469



##########
File path: 
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -138,7 +138,6 @@ object SamzaContainer extends Logging {
     val containerModel = jobModel.getContainers.get(containerId)
     val containerName = "samza-container-%s" format containerId
     val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions
-

Review comment:
       revert
   

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TransactionalTaskSideInputStorageManager.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TransactionalTaskSideInputStorageManager extends 
NonTransactionalTaskSideInputStorageManager implements 
TaskSideInputStorageManager {

Review comment:
       I feel this pattern has some restrictions on evolution of Transactional 
& Non-Transactional side input storage manager.
   1. It is not clear what responsibilities come from non-transactional side 
input storage manager that needs to be inherited vs what needs to modified. 
Also, that is kinda tied with the evolution of non-transactional storage 
manager code too.
   2. The extension of NonTransactional storage manager introduces coupling 
that requires testing the functionality of transactional storage manager too in 
case of any changes or evolution of the former.
   
   What do you think about making `TaskSideInputStorageManager` as an abstract 
class with the common responsibilities that way it is explicit what the 
additional responsibilities are for these extensions and it also encourages 
isolation and single responsibility for these extensions.

##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/SideInputRestoreTask.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallbackFactory;
+import scala.collection.JavaConversions;
+
+
+class SideInputRestoreTask extends RunLoopTask {
+
+  private final TaskName taskName;
+  private final Set<SystemStreamPartition> taskSSPs;
+  private final TaskSideInputHandler taskSideInputHandler;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final TaskInstanceMetrics metrics;
+
+  public SideInputRestoreTask(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPs,
+      TaskSideInputHandler taskSideInputHandler,
+      TaskSideInputStorageManager taskSideInputStorageManager,
+      TaskInstanceMetrics metrics) {
+    this.taskName = taskName;
+    this.taskSSPs = taskSSPs;
+    this.taskSideInputHandler = taskSideInputHandler;
+    this.taskSideInputStorageManager = taskSideInputStorageManager;
+    this.metrics = metrics;

Review comment:
       Few questions
   
   1. Can we leverage these metrics within this class and have it updated for 
process & commit operations?
   2. Do these metrics impact the statistics of task instance metrics on the 
container or since this used by a separate run loop instance do we fire them 
under a different context?
   
   

##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoop.java
##########
@@ -625,7 +625,9 @@ public void run() {
               log.trace("Update offset for ssp {}, offset {}", 
envelope.getSystemStreamPartition(), envelope.getOffset());
 
               // update offset
-              task.offsetManager().update(task.taskName(), 
envelope.getSystemStreamPartition(), envelope.getOffset());
+              if (task.offsetManager() != null) {

Review comment:
       Why is this nullable now? Can we still have a no-op implementation as 
default to eliminate this check?

##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -110,13 +114,13 @@
 public class ContainerStorageManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerStorageManager.class);
   private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
-  private static final String SIDEINPUTS_READ_THREAD_NAME = "SideInputs Read 
Thread";
-  private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush 
Thread";
+  private static final String SIDEINPUTS_RUNLOOP_THREAD_NAME = "SideInputs 
RunLoop Thread";
+  private static final String SIDEINPUTS_CHECKPOINT_THREAD_NAME = "SideInputs 
Checkpoint Refresh Thread";
   private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
   // We use a prefix to differentiate the SystemConsumersMetrics for 
sideInputs from the ones in SamzaContainer
 
   private static final int SIDE_INPUT_READ_THREAD_TIMEOUT_SECONDS = 10; // 
Timeout with which sideinput read thread checks for exceptions
-  private static final Duration SIDE_INPUT_FLUSH_TIMEOUT = 
Duration.ofMinutes(1); // Period with which sideinputs are flushed
+  private static final Duration SIDE_INPUT_CHECKPOINT_SHUTDOWN_TIMEOUT = 
Duration.ofMinutes(1); // Period with which sideinputs are flushed

Review comment:
       did you mean checkpoint timeout? what does the shutdown refer to?

##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -110,13 +114,13 @@
 public class ContainerStorageManager {

Review comment:
       `ContainerStorageManager` feels too heavy already and this is adding 
more to it. Is it possible to extract the responsibilities and move to another 
class? I think we should at least evaluate and come up with a plan to simplify 
and extract some of the responsibilities out of this class. 

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TransactionalTaskSideInputStorageManager.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TransactionalTaskSideInputStorageManager extends 
NonTransactionalTaskSideInputStorageManager implements 
TaskSideInputStorageManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TransactionalTaskSideInputStorageManager.class);
+
+  public TransactionalTaskSideInputStorageManager(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> sideInputStores,
+      Map<String, Set<SystemStreamPartition>> storesToSSPs,
+      Clock clock) {
+    super(taskName, taskMode, storeBaseDir, sideInputStores, storesToSSPs, 
clock);
+  }
+
+  @Override
+  public Map<String, Path> checkpoint(CheckpointId checkpointId) {
+    LOG.info("Creating checkpoint for task: {}", this.taskName);
+
+    Map<String, Path> checkpointPaths = new HashMap<>();
+    stores.forEach((store, storageEngine) ->
+      // TODO what subset of stores to checkpoint? an ssp can be a changelog 
side input for one store and a regular side input for another store
+      storageEngine.checkpoint(checkpointId).ifPresent(path -> 
checkpointPaths.put(store, path))
+    );
+    return checkpointPaths;
+  }
+
+  @Override
+  public void removeOldCheckpoints(String latestCheckpointId) {
+    LOG.info("Removing checkpoints older than: {} for task: {}", 
latestCheckpointId, this.taskName);
+    File[] storeDirs = storeBaseDir.listFiles((dir, name) -> 
stores.containsKey(name));
+    (storeDirs == null ? Stream.<File>empty() : 
Arrays.stream(storeDirs)).forEach(storeDir -> {
+        String taskStoreName = 
storageManagerUtil.getTaskStoreDir(storeBaseDir, storeDir.getName(), taskName, 
taskMode).getName();
+        FileFilter wildcardFileFilter = new WildcardFileFilter(taskStoreName + 
"-*");
+        File[] checkpointDirs = storeDir.listFiles(wildcardFileFilter);
+        (checkpointDirs == null ? Stream.<File>empty() : 
Arrays.stream(checkpointDirs)).forEach(checkpointDir -> {
+            if (checkpointDir.getName().contains(latestCheckpointId)) {
+              try {
+                FileUtils.deleteDirectory(checkpointDir);
+              } catch (IOException e) {
+                LOG.error("Failed to remove old checkpointDir: {} for task: 
{}, latestCheckpointId: {}", checkpointDir, taskName, latestCheckpointId);
+              }
+            }
+          });
+      });
+  }
+
+  /**
+   * This method will write offset files to each store's primary and 
checkpoint directories (if given) corresponding
+   * to the set of lastProcessedOffsets given.
+   */

Review comment:
       can we follow proper java docs format to include params?
   

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TransactionalTaskSideInputStorageManager.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TransactionalTaskSideInputStorageManager extends 
NonTransactionalTaskSideInputStorageManager implements 
TaskSideInputStorageManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TransactionalTaskSideInputStorageManager.class);
+
+  public TransactionalTaskSideInputStorageManager(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> sideInputStores,
+      Map<String, Set<SystemStreamPartition>> storesToSSPs,
+      Clock clock) {
+    super(taskName, taskMode, storeBaseDir, sideInputStores, storesToSSPs, 
clock);
+  }
+
+  @Override
+  public Map<String, Path> checkpoint(CheckpointId checkpointId) {
+    LOG.info("Creating checkpoint for task: {}", this.taskName);
+
+    Map<String, Path> checkpointPaths = new HashMap<>();
+    stores.forEach((store, storageEngine) ->
+      // TODO what subset of stores to checkpoint? an ssp can be a changelog 
side input for one store and a regular side input for another store
+      storageEngine.checkpoint(checkpointId).ifPresent(path -> 
checkpointPaths.put(store, path))
+    );
+    return checkpointPaths;
+  }
+
+  @Override
+  public void removeOldCheckpoints(String latestCheckpointId) {
+    LOG.info("Removing checkpoints older than: {} for task: {}", 
latestCheckpointId, this.taskName);
+    File[] storeDirs = storeBaseDir.listFiles((dir, name) -> 
stores.containsKey(name));
+    (storeDirs == null ? Stream.<File>empty() : 
Arrays.stream(storeDirs)).forEach(storeDir -> {
+        String taskStoreName = 
storageManagerUtil.getTaskStoreDir(storeBaseDir, storeDir.getName(), taskName, 
taskMode).getName();
+        FileFilter wildcardFileFilter = new WildcardFileFilter(taskStoreName + 
"-*");
+        File[] checkpointDirs = storeDir.listFiles(wildcardFileFilter);
+        (checkpointDirs == null ? Stream.<File>empty() : 
Arrays.stream(checkpointDirs)).forEach(checkpointDir -> {
+            if (checkpointDir.getName().contains(latestCheckpointId)) {
+              try {
+                FileUtils.deleteDirectory(checkpointDir);
+              } catch (IOException e) {
+                LOG.error("Failed to remove old checkpointDir: {} for task: 
{}, latestCheckpointId: {}", checkpointDir, taskName, latestCheckpointId);
+              }
+            }
+          });
+      });
+  }
+
+  /**
+   * This method will write offset files to each store's primary and 
checkpoint directories (if given) corresponding
+   * to the set of lastProcessedOffsets given.
+   */
+  @Override
+  public void writeOffsetFiles(Map<SystemStreamPartition, String> 
lastProcessedOffsets, Map<String, Path> checkpointPaths) {
+    // write offset files for each store's non-checkpoint directories
+    super.writeOffsetFiles(lastProcessedOffsets, Collections.emptyMap());
+
+    // write offset files to checkpoint directories

Review comment:
       What is the behavior of restarts if we fail here but end up writing 
offset files to non-checkpoint directories? 

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TransactionalTaskSideInputStorageManager.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TransactionalTaskSideInputStorageManager extends 
NonTransactionalTaskSideInputStorageManager implements 
TaskSideInputStorageManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TransactionalTaskSideInputStorageManager.class);
+
+  public TransactionalTaskSideInputStorageManager(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> sideInputStores,
+      Map<String, Set<SystemStreamPartition>> storesToSSPs,
+      Clock clock) {
+    super(taskName, taskMode, storeBaseDir, sideInputStores, storesToSSPs, 
clock);
+  }
+
+  @Override
+  public Map<String, Path> checkpoint(CheckpointId checkpointId) {
+    LOG.info("Creating checkpoint for task: {}", this.taskName);
+
+    Map<String, Path> checkpointPaths = new HashMap<>();
+    stores.forEach((store, storageEngine) ->
+      // TODO what subset of stores to checkpoint? an ssp can be a changelog 
side input for one store and a regular side input for another store
+      storageEngine.checkpoint(checkpointId).ifPresent(path -> 
checkpointPaths.put(store, path))
+    );
+    return checkpointPaths;
+  }
+
+  @Override
+  public void removeOldCheckpoints(String latestCheckpointId) {
+    LOG.info("Removing checkpoints older than: {} for task: {}", 
latestCheckpointId, this.taskName);
+    File[] storeDirs = storeBaseDir.listFiles((dir, name) -> 
stores.containsKey(name));
+    (storeDirs == null ? Stream.<File>empty() : 
Arrays.stream(storeDirs)).forEach(storeDir -> {

Review comment:
       imo, it is hard to read. Can we split this into declaration and for 
loop? 

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TransactionalTaskSideInputStorageManager.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TransactionalTaskSideInputStorageManager extends 
NonTransactionalTaskSideInputStorageManager implements 
TaskSideInputStorageManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TransactionalTaskSideInputStorageManager.class);
+
+  public TransactionalTaskSideInputStorageManager(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> sideInputStores,
+      Map<String, Set<SystemStreamPartition>> storesToSSPs,
+      Clock clock) {
+    super(taskName, taskMode, storeBaseDir, sideInputStores, storesToSSPs, 
clock);
+  }
+
+  @Override
+  public Map<String, Path> checkpoint(CheckpointId checkpointId) {
+    LOG.info("Creating checkpoint for task: {}", this.taskName);
+
+    Map<String, Path> checkpointPaths = new HashMap<>();
+    stores.forEach((store, storageEngine) ->
+      // TODO what subset of stores to checkpoint? an ssp can be a changelog 
side input for one store and a regular side input for another store
+      storageEngine.checkpoint(checkpointId).ifPresent(path -> 
checkpointPaths.put(store, path))
+    );
+    return checkpointPaths;
+  }
+
+  @Override
+  public void removeOldCheckpoints(String latestCheckpointId) {
+    LOG.info("Removing checkpoints older than: {} for task: {}", 
latestCheckpointId, this.taskName);
+    File[] storeDirs = storeBaseDir.listFiles((dir, name) -> 
stores.containsKey(name));
+    (storeDirs == null ? Stream.<File>empty() : 
Arrays.stream(storeDirs)).forEach(storeDir -> {
+        String taskStoreName = 
storageManagerUtil.getTaskStoreDir(storeBaseDir, storeDir.getName(), taskName, 
taskMode).getName();
+        FileFilter wildcardFileFilter = new WildcardFileFilter(taskStoreName + 
"-*");
+        File[] checkpointDirs = storeDir.listFiles(wildcardFileFilter);
+        (checkpointDirs == null ? Stream.<File>empty() : 
Arrays.stream(checkpointDirs)).forEach(checkpointDir -> {

Review comment:
       same as above, split into two statements for readability.

##########
File path: 
samza-core/src/main/scala/org/apache/samza/container/RunLoopTask.scala
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container
+
+import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.scheduler.EpochTimeScheduler
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
+import org.apache.samza.task.{ReadableCoordinator, TaskCallbackFactory}
+import org.apache.samza.util.Logging
+
+abstract class RunLoopTask extends Logging {

Review comment:
       Can we have this written in java instead of scala?

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.executors.KeyBasedExecutorService;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+// TODO pick a better name for this class?
+
+/**
+ * This class encapsulates all processing logic / state for all side input 
SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new 
StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  // marks the offsets per SSP that must be bootstrapped to, inclusive. 
container startup will block until these offsets have been processed
+  private final Map<SystemStreamPartition, String> sspBootstrapOffsets;
+
+  // these objects are SHARED WITH CONTAINER STORAGE MANAGER
+  // used to coordinate updates of checkpoint offsets by 
ContainerStorageManager
+  private final Map<SystemStreamPartition, Object> sspLockObjects;
+  // indicates the latest checkpoint per SSP. updated by 
ContainerStorageManager background thread
+  private final Map<SystemStreamPartition, Optional<String>> 
checkpointedOffsets;
+  // used to block container until each SSP reaches its bootstrap offset
+  private final CountDownLatch sideInputTasksCaughtUp;
+
+  private final Map<SystemStreamPartition, String> startingOffsets;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets;
+  private final KeyBasedExecutorService checkpointedSSPExecutor;
+  private final KeyBasedExecutorService nonCheckpointedSSPExecutor;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskSideInputStorageManager taskSideInputStorageManager,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      CountDownLatch sideInputTasksCaughtUp,
+      Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> initialSSPMetadata,
+      Map<SystemStreamPartition, Object> sspLockObjects,
+      Map<SystemStreamPartition, Optional<String>> checkpointOffsets) {
+    this.taskName = taskName;
+    this.taskSideInputStorageManager = taskSideInputStorageManager;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.sideInputTasksCaughtUp = sideInputTasksCaughtUp;
+    this.storeToProcessor = storeToProcessor;
+    this.sspLockObjects = Collections.unmodifiableMap(sspLockObjects);
+    this.checkpointedOffsets = Collections.unmodifiableMap(checkpointOffsets);
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, 
storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    // for non-checkpointed SSPs, use their newest offset. for checkpointed 
SSPs, use their current checkpoint
+    this.sspBootstrapOffsets = new HashMap<>();
+    initialSSPMetadata.entrySet().stream()
+        // only SSPs for this task
+        .filter(entry -> this.sspToStores.containsKey(entry.getKey()))
+        // that do not have checkpoints
+        .filter(entry -> !this.checkpointedOffsets.containsKey(entry.getKey()))
+        .forEach(entry -> this.sspBootstrapOffsets.put(entry.getKey(), 
entry.getValue().getNewestOffset()));
+    this.checkpointedOffsets.entrySet().stream()
+        .filter(entry -> entry.getValue().isPresent())
+        .forEach(entry -> this.sspBootstrapOffsets.put(entry.getKey(), 
entry.getValue().get()));
+
+    this.lastProcessedOffsets = taskSideInputStorageManager.getFileOffsets();
+    this.startingOffsets = getStartingOffsets(this.lastProcessedOffsets, 
getOldestOffsets());
+    this.startingOffsets.forEach((ssp, offset) -> checkCaughtUp(ssp, offset, 
true));
+
+
+    Set<String> checkpointedStores = this.sspToStores.entrySet().stream()
+        .filter(sspAndStores -> 
this.checkpointedOffsets.containsKey(sspAndStores.getKey()))
+        .flatMap(sspAndStores -> sspAndStores.getValue().stream())
+        .collect(Collectors.toSet());
+
+    Set<String> nonCheckpointedStores = this.sspToStores.entrySet().stream()
+        .filter(sspAndStores -> 
!this.checkpointedOffsets.containsKey(sspAndStores.getKey()))
+        .flatMap(sspAndStores -> sspAndStores.getValue().stream())
+        .collect(Collectors.toSet());
+
+    this.checkpointedSSPExecutor = new KeyBasedExecutorService(Math.max(1, 
checkpointedStores.size()));
+    this.nonCheckpointedSSPExecutor = new KeyBasedExecutorService(Math.max(1, 
nonCheckpointedStores.size()));
+  }
+
+  public void process(IncomingMessageEnvelope envelope, TaskCallbackFactory 
callbackFactory) {
+    TaskCallback callback = callbackFactory.createCallback();

Review comment:
       The reason we have callback factory at the TaskInstance layer is to have 
more precision around the triggering of timeout for callback and have it start 
right before dispatching the message to the user logic. If we are considering 
SideInputHandler similar to user logic then we should minimize the exposure of 
factory to the handler; If not, we should move this closer to the 
`SideInputProcessor` to reflect the semantics of callback timeout.
   
   Looks like we may need to move this closer to the process as we can get 
blocked on the lock which could trigger callback timeout.

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.executors.KeyBasedExecutorService;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+// TODO pick a better name for this class?

Review comment:
       is it still todo?

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TransactionalTaskSideInputStorageManager.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TransactionalTaskSideInputStorageManager extends 
NonTransactionalTaskSideInputStorageManager implements 
TaskSideInputStorageManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TransactionalTaskSideInputStorageManager.class);
+
+  public TransactionalTaskSideInputStorageManager(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> sideInputStores,
+      Map<String, Set<SystemStreamPartition>> storesToSSPs,
+      Clock clock) {
+    super(taskName, taskMode, storeBaseDir, sideInputStores, storesToSSPs, 
clock);
+  }
+
+  @Override
+  public Map<String, Path> checkpoint(CheckpointId checkpointId) {
+    LOG.info("Creating checkpoint for task: {}", this.taskName);
+
+    Map<String, Path> checkpointPaths = new HashMap<>();
+    stores.forEach((store, storageEngine) ->
+      // TODO what subset of stores to checkpoint? an ssp can be a changelog 
side input for one store and a regular side input for another store
+      storageEngine.checkpoint(checkpointId).ifPresent(path -> 
checkpointPaths.put(store, path))
+    );
+    return checkpointPaths;
+  }
+
+  @Override
+  public void removeOldCheckpoints(String latestCheckpointId) {
+    LOG.info("Removing checkpoints older than: {} for task: {}", 
latestCheckpointId, this.taskName);
+    File[] storeDirs = storeBaseDir.listFiles((dir, name) -> 
stores.containsKey(name));
+    (storeDirs == null ? Stream.<File>empty() : 
Arrays.stream(storeDirs)).forEach(storeDir -> {

Review comment:
       On the second thought, wondering if we can eliminate the null altogether 
and have it defaulted to empty list




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to