rmatharu commented on a change in pull request #1005: SAMZA-2170: Enabling
writing of both new and old format offset files for stores and side-input-stores
URL: https://github.com/apache/samza/pull/1005#discussion_r277117739
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
##########
@@ -139,13 +161,26 @@ public static boolean isOffsetFileValid(File storeDir,
Set<SystemStreamPartition
* @param storeName the store name to use
* @param taskName the task name which is referencing the store
* @param offsets The SSP-offset to write
+ * @param isSideInput true if store is a side-input store, false if it is a
regular store
* @throws IOException because of deserializing to json
*/
public static void writeOffsetFile(File storeBaseDir, String storeName,
TaskName taskName, TaskMode taskMode,
- Map<SystemStreamPartition, String> offsets) throws IOException {
- File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName,
taskName, taskMode), OFFSET_FILE_NAME);
+ Map<SystemStreamPartition, String> offsets, boolean isSideInput) throws
IOException {
+
+ // First, we write the new-format offset file
+ File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName,
taskName, taskMode), OFFSET_FILE_NAME_NEW);
String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
FileUtil.writeWithChecksum(offsetFile, fileContents);
+
+ // Now we write the old format offset file, which are different for
store-offset and side-inputs
+ if (isSideInput) {
+ offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName,
taskName, taskMode), SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+ fileContents = OBJECT_WRITER.writeValueAsString(offsets);
+ FileUtil.writeWithChecksum(offsetFile, fileContents);
+ } else {
+ offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName,
taskName, taskMode), OFFSET_FILE_NAME_LEGACY);
+ FileUtil.writeWithChecksum(offsetFile,
offsets.entrySet().iterator().next().getValue());
Review comment:
yup,
i'll start a separate PR with these changes, and tests
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services