This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new c6e50a5  SAMZA-2170: Enabling writing of both new and old format 
offset files for stores and side-input-stores
c6e50a5 is described below

commit c6e50a5117b78d2fae570f5286ceec108a59ecc8
Author: Ray Matharu <[email protected]>
AuthorDate: Fri Apr 19 16:26:35 2019 -0700

    SAMZA-2170: Enabling writing of both new and old format offset files for 
stores and side-input-stores
    
    After Samza 1.1, the offset file for stores and sideinputs has been unified 
and is versioned.
    However, this Jira adds the logic in code to read and write both this new 
and old format. Because of this apps can switch between 1.0 and 1.1 versions 
seamlessly.
    
    Note that the old format and filenames for store and side-input offset 
differed.
    
    Author: Ray Matharu <[email protected]>
    
    Reviewers: Prateek Maheshwari <[email protected]>
    
    Closes #1005 from rmatharu/bugfix-offset
---
 .../apache/samza/storage/StorageManagerUtil.java   | 98 +++++++++++++++++++---
 .../samza/storage/TaskSideInputStorageManager.java |  8 +-
 .../samza/storage/ContainerStorageManager.java     |  6 +-
 .../apache/samza/storage/TaskStorageManager.scala  |  2 +-
 .../samza/storage/TestTaskStorageManager.scala     | 26 +++---
 .../apache/samza/monitor/LocalStoreMonitor.java    |  2 +-
 .../samza/monitor/TestLocalStoreMonitor.java       |  2 +-
 7 files changed, 109 insertions(+), 35 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index e662fa1..c064a8e 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -45,7 +45,9 @@ import org.slf4j.LoggerFactory;
 
 public class StorageManagerUtil {
   private static final Logger LOG = 
LoggerFactory.getLogger(StorageManagerUtil.class);
-  public static final String OFFSET_FILE_NAME = "OFFSET";
+  public static final String OFFSET_FILE_NAME_NEW = "OFFSET-v2";
+  public static final String OFFSET_FILE_NAME_LEGACY = "OFFSET";
+  public static final String SIDE_INPUT_OFFSET_FILE_NAME_LEGACY = 
"SIDE-INPUT-OFFSETS";
   private static final ObjectMapper OBJECT_MAPPER = 
SamzaObjectMapper.getObjectMapper();
   private static final TypeReference<Map<SystemStreamPartition, String>> 
OFFSETS_TYPE_REFERENCE =
             new TypeReference<Map<SystemStreamPartition, String>>() { };
@@ -92,14 +94,33 @@ public class StorageManagerUtil {
    * @param storeDir the base directory of the store
    * @param storeDeleteRetentionInMs store delete retention in millis
    * @param currentTimeMs current time in ms
+   * @param isSideInput true if store is a side-input store, false if it is a 
regular store
    * @return true if the store is stale, false otherwise
    */
-  public static boolean isStaleStore(File storeDir, long 
storeDeleteRetentionInMs, long currentTimeMs) {
+  public static boolean isStaleStore(File storeDir, long 
storeDeleteRetentionInMs, long currentTimeMs, boolean isSideInput) {
+    long offsetFileLastModifiedTime;
     boolean isStaleStore = false;
     String storePath = storeDir.toPath().toString();
+
     if (storeDir.exists()) {
-      File offsetFileRef = new File(storeDir, OFFSET_FILE_NAME);
-      long offsetFileLastModifiedTime = offsetFileRef.lastModified();
+
+      // We check if the new offset-file exists, if so we use its 
last-modified time, if it doesn't we use the legacy file
+      // depending on if it is a side-input or not,
+      // if neither exists, we use 0L (the defauilt return value of 
lastModified() when file does not exist
+      File offsetFileRefNew = new File(storeDir, OFFSET_FILE_NAME_NEW);
+      File offsetFileRefLegacy = new File(storeDir, OFFSET_FILE_NAME_LEGACY);
+      File sideInputOffsetFileRefLegacy = new File(storeDir, 
SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+
+      if (offsetFileRefNew.exists()) {
+        offsetFileLastModifiedTime = offsetFileRefNew.lastModified();
+      } else if (!isSideInput && offsetFileRefLegacy.exists()) {
+        offsetFileLastModifiedTime = offsetFileRefLegacy.lastModified();
+      } else if (isSideInput && sideInputOffsetFileRefLegacy.exists()) {
+        offsetFileLastModifiedTime = 
sideInputOffsetFileRefLegacy.lastModified();
+      } else {
+        offsetFileLastModifiedTime = 0L;
+      }
+
       if ((currentTimeMs - offsetFileLastModifiedTime) >= 
storeDeleteRetentionInMs) {
         LOG.info(
             String.format("Store: %s is stale since lastModifiedTime of offset 
file: %d, is older than store deleteRetentionMs: %d.",
@@ -117,12 +138,13 @@ public class StorageManagerUtil {
    *
    * @param storeDir the base directory of the store
    * @param storeSSPs storeSSPs (if any) associated with the store
+   * @param isSideInput true if store is a side-input store, false if it is a 
regular store
    * @return true if the offset file is valid. false otherwise.
    */
-  public static boolean isOffsetFileValid(File storeDir, 
Set<SystemStreamPartition> storeSSPs) {
+  public static boolean isOffsetFileValid(File storeDir, 
Set<SystemStreamPartition> storeSSPs, boolean isSideInput) {
     boolean hasValidOffsetFile = false;
     if (storeDir.exists()) {
-      Map<SystemStreamPartition, String> offsetContents = 
readOffsetFile(storeDir, storeSSPs);
+      Map<SystemStreamPartition, String> offsetContents = 
readOffsetFile(storeDir, storeSSPs, isSideInput);
       if (offsetContents != null && !offsetContents.isEmpty() && 
offsetContents.keySet().equals(storeSSPs)) {
         hasValidOffsetFile = true;
       } else {
@@ -139,13 +161,26 @@ public class StorageManagerUtil {
    * @param storeName the store name to use
    * @param taskName the task name which is referencing the store
    * @param offsets The SSP-offset to write
+   * @param isSideInput true if store is a side-input store, false if it is a 
regular store
    * @throws IOException because of deserializing to json
    */
   public static void writeOffsetFile(File storeBaseDir, String storeName, 
TaskName taskName, TaskMode taskMode,
-      Map<SystemStreamPartition, String> offsets) throws IOException {
-    File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, 
taskName, taskMode), OFFSET_FILE_NAME);
+      Map<SystemStreamPartition, String> offsets, boolean isSideInput) throws 
IOException {
+
+    // First, we write the new-format offset file
+    File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, 
taskName, taskMode), OFFSET_FILE_NAME_NEW);
     String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
     FileUtil.writeWithChecksum(offsetFile, fileContents);
+
+    // Now we write the old format offset file, which are different for 
store-offset and side-inputs
+    if (isSideInput) {
+      offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, 
taskName, taskMode), SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+      fileContents = OBJECT_WRITER.writeValueAsString(offsets);
+      FileUtil.writeWithChecksum(offsetFile, fileContents);
+    } else {
+      offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, 
taskName, taskMode), OFFSET_FILE_NAME_LEGACY);
+      FileUtil.writeWithChecksum(offsetFile, 
offsets.entrySet().iterator().next().getValue());
+    }
   }
 
   /**
@@ -155,7 +190,15 @@ public class StorageManagerUtil {
    * @param taskName the task name which is referencing the store
    */
   public static void deleteOffsetFile(File storeBaseDir, String storeName, 
TaskName taskName) {
-    File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, 
taskName, TaskMode.Active), OFFSET_FILE_NAME);
+    deleteOffsetFile(storeBaseDir, storeName, taskName, OFFSET_FILE_NAME_NEW);
+    deleteOffsetFile(storeBaseDir, storeName, taskName, 
OFFSET_FILE_NAME_LEGACY);
+  }
+
+  /**
+   * Delete the given offsetFile for the store if it exists.
+   */
+  private static void deleteOffsetFile(File storeBaseDir, String storeName, 
TaskName taskName, String offsetFileName) {
+    File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, 
taskName, TaskMode.Active), offsetFileName);
     if (offsetFile.exists()) {
       FileUtil.rm(offsetFile);
     }
@@ -172,16 +215,47 @@ public class StorageManagerUtil {
   }
 
   /**
+   * Read and return the offset from the directory's offset file
+   *
+   * @param storagePartitionDir the base directory of the store
+   * @param storeSSPs SSPs associated with the store (if any)
+   * @param isSideInput, true if the store is a side-input store, false 
otherwise
+   * @return the content of the offset file if it exists for the store, null 
otherwise.
+   */
+  public static Map<SystemStreamPartition, String> readOffsetFile(File 
storagePartitionDir, Set<SystemStreamPartition> storeSSPs, boolean isSideInput) 
{
+
+    File offsetFileRefNew = new File(storagePartitionDir, 
OFFSET_FILE_NAME_NEW);
+    File offsetFileRefLegacy = new File(storagePartitionDir, 
OFFSET_FILE_NAME_LEGACY);
+    File sideInputOffsetFileRefLegacy = new File(storagePartitionDir, 
SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+
+    // First we check if the new offset file exists, if it does we read 
offsets from it regardless of old or new format,
+    // if it doesn't exist, we check if the store is non-sideInput and 
legacy-offset file exists, if so we read offsets
+    // from the old non-side-input offset file (regardless of the offset 
format),
+    // last, we check if the store is a sideInput and the old 
side-input-offset file exists
+    if (offsetFileRefNew.exists()) {
+      return readOffsetFile(storagePartitionDir, offsetFileRefNew.getName(), 
storeSSPs);
+    } else if (!isSideInput && offsetFileRefLegacy.exists()) {
+      return readOffsetFile(storagePartitionDir, 
offsetFileRefLegacy.getName(), storeSSPs);
+    } else if (isSideInput && sideInputOffsetFileRefLegacy.exists()) {
+      return readOffsetFile(storagePartitionDir, 
sideInputOffsetFileRefLegacy.getName(), storeSSPs);
+    } else {
+      return new HashMap<>();
+    }
+
+  }
+
+  /**
    * Read and return the contents of the offset file.
    *
    * @param storagePartitionDir the base directory of the store
+   * @param offsetFileName the name of the offset file
    * @param storeSSPs SSPs associated with the store (if any)
    * @return the content of the offset file if it exists for the store, null 
otherwise.
    */
-  public static Map<SystemStreamPartition, String> readOffsetFile(File 
storagePartitionDir, Set<SystemStreamPartition> storeSSPs) {
+  private static Map<SystemStreamPartition, String> readOffsetFile(File 
storagePartitionDir, String offsetFileName, Set<SystemStreamPartition> 
storeSSPs) {
     Map<SystemStreamPartition, String> offsets = new HashMap<>();
     String fileContents = null;
-    File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME);
+    File offsetFileRef = new File(storagePartitionDir, offsetFileName);
     String storePath = storagePartitionDir.getPath();
 
     if (offsetFileRef.exists()) {
@@ -190,7 +264,7 @@ public class StorageManagerUtil {
         fileContents = FileUtil.readWithChecksum(offsetFileRef);
         offsets = OBJECT_MAPPER.readValue(fileContents, 
OFFSETS_TYPE_REFERENCE);
       } catch (JsonParseException | JsonMappingException e) {
-        LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+        LOG.info("Exception in json-parsing offset file {} {}, reading as 
string offset-value", storagePartitionDir.toPath(), offsetFileName);
         final String finalFileContents = fileContents;
         offsets = (storeSSPs.size() == 1) ? 
storeSSPs.stream().collect(Collectors.toMap(ssp -> ssp, offset -> 
finalFileContents)) : offsets;
       } catch (Exception e) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index 5e9c6cf..59c31d9 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -260,7 +260,7 @@ public class TaskSideInputStorageManager {
               .collect(Collectors.toMap(Function.identity(), 
lastProcessedOffsets::get));
 
             try {
-              StorageManagerUtil.writeOffsetFile(storeBaseDir, storeName, 
taskName, taskMode, offsets);
+              StorageManagerUtil.writeOffsetFile(storeBaseDir, storeName, 
taskName, taskMode, offsets, true);
             } catch (Exception e) {
               throw new SamzaException("Failed to write offset file for side 
input store: " + storeName, e);
             }
@@ -285,7 +285,7 @@ public class TaskSideInputStorageManager {
         if (isValidSideInputStore(storeName, storeLocation)) {
           try {
 
-            Map<SystemStreamPartition, String> offsets = 
StorageManagerUtil.readOffsetFile(storeLocation, storeToSSps.get(storeName));
+            Map<SystemStreamPartition, String> offsets = 
StorageManagerUtil.readOffsetFile(storeLocation, storeToSSps.get(storeName), 
true);
             fileOffsets.putAll(offsets);
           } catch (Exception e) {
             LOG.warn("Failed to load the offset file for side input store:" + 
storeName, e);
@@ -368,8 +368,8 @@ public class TaskSideInputStorageManager {
 
   private boolean isValidSideInputStore(String storeName, File storeLocation) {
     return isPersistedStore(storeName)
-        && !StorageManagerUtil.isStaleStore(storeLocation, 
STORE_DELETE_RETENTION_MS, clock.currentTimeMillis())
-        && StorageManagerUtil.isOffsetFileValid(storeLocation, 
storeToSSps.get(storeName));
+        && !StorageManagerUtil.isStaleStore(storeLocation, 
STORE_DELETE_RETENTION_MS, clock.currentTimeMillis(), true)
+        && StorageManagerUtil.isOffsetFileValid(storeLocation, 
storeToSSps.get(storeName), true);
   }
 
   private boolean isPersistedStore(String storeName) {
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index a9443b4..61505f4 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -936,7 +936,7 @@ public class ContainerStorageManager {
 
             SystemStreamPartition changelogSSP = new 
SystemStreamPartition(changelogSystemStreams.get(storeName), 
taskModel.getChangelogPartition());
             Map<SystemStreamPartition, String> offset =
-                StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, 
Collections.singleton(changelogSSP));
+                StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, 
Collections.singleton(changelogSSP), false);
             LOG.info("Read offset {} for the store {} from logged storage 
partition directory {}", offset, storeName, loggedStorePartitionDir);
 
             if (offset.containsKey(changelogSSP)) {
@@ -965,8 +965,8 @@ public class ContainerStorageManager {
 
       if (changelogSystemStreams.containsKey(storeName)) {
         SystemStreamPartition changelogSSP = new 
SystemStreamPartition(changelogSystemStreams.get(storeName), 
taskModel.getChangelogPartition());
-        return 
this.taskStores.get(storeName).getStoreProperties().isPersistedToDisk() && 
StorageManagerUtil.isOffsetFileValid(loggedStoreDir, 
Collections.singleton(changelogSSP))
-            && !StorageManagerUtil.isStaleStore(loggedStoreDir, 
changeLogDeleteRetentionInMs, clock.currentTimeMillis());
+        return 
this.taskStores.get(storeName).getStoreProperties().isPersistedToDisk() && 
StorageManagerUtil.isOffsetFileValid(loggedStoreDir, 
Collections.singleton(changelogSSP), false)
+            && !StorageManagerUtil.isStaleStore(loggedStoreDir, 
changeLogDeleteRetentionInMs, clock.currentTimeMillis(), false);
       }
 
       return false;
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index c2a2c7e..046d9a6 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -93,7 +93,7 @@ class TaskStorageManager(
           debug("Storing offset for store in OFFSET file ")
 
           // TaskStorageManagers are only spun-up for active tasks
-          StorageManagerUtil.writeOffsetFile(loggedStoreBaseDir, storeName, 
taskName, TaskMode.Active, Map(ssp -> newestOffset).asJava)
+          StorageManagerUtil.writeOffsetFile(loggedStoreBaseDir, storeName, 
taskName, TaskMode.Active, Map(ssp -> newestOffset).asJava, false)
           debug("Successfully stored offset %s for store %s in OFFSET file " 
format(newestOffset, storeName))
         } else {
           //if newestOffset is null, then it means the store is (or has 
become) empty. No need to persist the offset file
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 424a0d0..3a4b2b5 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -84,7 +84,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val ssp = new SystemStreamPartition(ss, partition)
     val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active)
     val storeFile = new File(storeDirectory, "store.sst")
-    val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME_NEW)
 
     val mockStorageEngine: StorageEngine = 
createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, 
storeFile)
 
@@ -263,7 +263,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @Test
   def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
-    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active), StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active), 
StorageManagerUtil.OFFSET_FILE_NAME_NEW)
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -281,7 +281,7 @@ class TestTaskStorageManager extends MockitoSugar {
     // is older than deletionRetention of the changeLog.
     val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active)
     storeDirectory.setLastModified(0)
-    val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME_NEW)
     offsetFile.createNewFile()
     FileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
     offsetFile.setLastModified(0)
@@ -298,7 +298,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @Test
   def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
-    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active), StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active), 
StorageManagerUtil.OFFSET_FILE_NAME_NEW)
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -315,7 +315,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val partition = new Partition(0)
 
     val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active)
-    val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME_NEW)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
@@ -355,9 +355,9 @@ class TestTaskStorageManager extends MockitoSugar {
   def testFlushCreatesOffsetFileForLoggedStore() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME_NEW)
     val anotherOffsetPath = new File(
-      
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 store, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME)
+      
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 store, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME_NEW)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
@@ -393,7 +393,7 @@ class TestTaskStorageManager extends MockitoSugar {
   def testFlushDeletesOffsetFileForLoggedStoreForEmptyPartition() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME_NEW)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("0", "100", "101")
@@ -440,7 +440,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val partition = new Partition(0)
     val ssp = new SystemStreamPartition("kafka", getStreamName(loggedStore), 
partition)
 
-    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME_NEW)
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val sspMetadataCache = mock[SSPMetadataCache]
@@ -487,7 +487,7 @@ class TestTaskStorageManager extends MockitoSugar {
   def testStopShouldNotCreateOffsetFileForEmptyStore() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME_NEW)
 
 
     val sspMetadataCache = mock[SSPMetadataCache]
@@ -567,14 +567,14 @@ class TestTaskStorageManager extends MockitoSugar {
     // Create a file in old single-offset format, with a sample offset
     val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active)
     val storeFile = new File(storeDirectory, "store.sst")
-    val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME_NEW)
     val sampleOldOffset = "912321"
     FileUtil.writeWithChecksum(offsetFile, sampleOldOffset)
 
 
     // read offset against a given ssp from the file
     var ssp = new SystemStreamPartition("kafka", "test-stream", new 
Partition(0))
-    val offsets = StorageManagerUtil.readOffsetFile(storeDirectory, 
Set(ssp).asJava)
+    val offsets = StorageManagerUtil.readOffsetFile(storeDirectory, 
Set(ssp).asJava, false)
     assertTrue(offsets.get(ssp).equals(sampleOldOffset))
   }
 
@@ -590,7 +590,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val storeFile = new File(storeDirectory, "store.sst")
 
     if (writeOffsetFile) {
-      val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME)
+      val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME_NEW)
       if (fileOffset != null) {
         FileUtil.writeWithChecksum(offsetFile, fileOffset)
       } else {
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
index c711e8d..4da49f4 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
@@ -50,7 +50,7 @@ public class LocalStoreMonitor implements Monitor {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalStoreMonitor.class);
 
-  private static final String OFFSET_FILE_NAME = 
StorageManagerUtil.OFFSET_FILE_NAME;
+  private static final String OFFSET_FILE_NAME = 
StorageManagerUtil.OFFSET_FILE_NAME_NEW;
 
   private final JobsClient jobsClient;
 
diff --git 
a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java 
b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
index f297a11..15f0df9 100644
--- 
a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
+++ 
b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
@@ -196,7 +196,7 @@ public class TestLocalStoreMonitor {
   }
 
   private static File createOffsetFile(File taskStoreDir) throws Exception {
-    File offsetFile = new File(taskStoreDir, 
StorageManagerUtil.OFFSET_FILE_NAME);
+    File offsetFile = new File(taskStoreDir, 
StorageManagerUtil.OFFSET_FILE_NAME_NEW);
     offsetFile.createNewFile();
     return offsetFile;
   }

Reply via email to