Repository: samza
Updated Branches:
  refs/heads/master 05113c339 -> adfc4bfc4


SAMZA-1486; Checkpoint manager implementation with Azure Table

vjagadish1989

Author: Daniel Chen <[email protected]>

Reviewers: Jagadish<[email protected]>

Closes #341 from dxichen/azure-checkpoint-manager


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/adfc4bfc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/adfc4bfc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/adfc4bfc

Branch: refs/heads/master
Commit: adfc4bfc40a5d92b3cc7e7813dd1a73c0c486e0e
Parents: 05113c3
Author: Daniel Chen <[email protected]>
Authored: Tue Nov 7 17:25:58 2017 -0800
Committer: Jagadish <[email protected]>
Committed: Tue Nov 7 17:25:58 2017 -0800

----------------------------------------------------------------------
 .../samza/checkpoint/CheckpointManager.java     |   6 +-
 .../azure/AzureCheckpointManager.java           | 236 +++++++++++++++++++
 .../azure/AzureCheckpointManagerFactory.java    |  33 +++
 .../checkpoint/azure/TaskCheckpointEntity.java  |  43 ++++
 .../org/apache/samza/config/AzureConfig.java    |   2 +-
 .../coordinator/AzureCoordinationUtils.java     |   2 +-
 .../samza/coordinator/AzureJobCoordinator.java  |   2 +-
 .../azure/ITestAzureCheckpointManager.java      | 181 ++++++++++++++
 8 files changed, 501 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java 
b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
index 10f166c..bc75351 100644
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -44,10 +44,14 @@ public interface CheckpointManager {
   /**
    * Returns the last recorded checkpoint for a specified taskName.
    * @param taskName Specific Samza taskName for which to get the last 
checkpoint of.
-   * @return A Checkpoint object with the recorded offset data of the 
specified partition.
+   * @return A Checkpoint object with the recorded offset data of the 
specified partition
+   *         or null if there is no recorded checkpoints for the task.
    */
   Checkpoint readLastCheckpoint(TaskName taskName);
 
+  /**
+   * Perform teardown operations for the Manager. Checkpoints are still 
persisted.
+   */
   void stop();
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
 
b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
new file mode 100644
index 0000000..df3e490
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
@@ -0,0 +1,236 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.checkpoint.azure;
+
+import com.google.common.collect.ImmutableMap;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.table.*;
+import org.apache.samza.AzureClient;
+import org.apache.samza.AzureException;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * Azure checkpoint manager is used to store checkpoints in a Azure Table.
+ * All the task checkpoints are added to the a single table named 
"SamzaTaskCheckpoints".
+ * The table entities take the following form:
+ *
+ * +-----------------+---------------------+-------------------+
+ * |                 |     Serialized      |                   |
+ * |   TaskName      |     JSON SSP        |     Offset        |
+ * |                 |                     |                   |
+ * +-----------------+---------------------+-------------------+
+ *
+ *  Each entity have a partitionKey set as the TaskName and the rowKey set as 
the SSP.
+ */
+public class AzureCheckpointManager implements CheckpointManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AzureCheckpointManager.class.getName());
+  private static final String PARTITION_KEY = "PartitionKey";
+
+  public static final int MAX_WRITE_BATCH_SIZE = 100;
+  public static final String CHECKPOINT_MANAGER_TABLE_NAME = 
"SamzaTaskCheckpoints";
+  public static final String SYSTEM_PROP_NAME = "system";
+  public static final String STREAM_PROP_NAME = "stream";
+  public static final String PARTITION_PROP_NAME = "partition";
+
+  private final String storageConnectionString;
+  private final AzureClient azureClient;
+  private CloudTable cloudTable;
+
+  private final Set<TaskName> taskNames = new HashSet<>();
+  private final JsonSerdeV2<Map<String, String>> jsonSerde = new 
JsonSerdeV2<>();
+
+  AzureCheckpointManager(AzureConfig azureConfig) {
+    storageConnectionString = azureConfig.getAzureConnectionString();
+    azureClient = new AzureClient(storageConnectionString);
+  }
+
+  @Override
+  public void start() {
+    try {
+      // Create the table if it doesn't exist.
+      cloudTable = 
azureClient.getTableClient().getTableReference(CHECKPOINT_MANAGER_TABLE_NAME);
+      cloudTable.createIfNotExists();
+
+    } catch (URISyntaxException e) {
+      LOG.error("Connection string {} specifies an invalid URI while creating 
checkpoint table.",
+              storageConnectionString);
+      throw new AzureException(e);
+
+    } catch (StorageException e) {
+      LOG.error("Azure Storage failed when creating checkpoint table", e);
+      throw new AzureException(e);
+    }
+  }
+
+  @Override
+  public void register(TaskName taskName) {
+    taskNames.add(taskName);
+  }
+
+  @Override
+  public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
+    if (!taskNames.contains(taskName)) {
+      throw new SamzaException("writing checkpoint of unregistered task");
+    }
+
+    TableBatchOperation batchOperation = new TableBatchOperation();
+
+    Iterator<Map.Entry<SystemStreamPartition, String>> iterator = 
checkpoint.getOffsets().entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<SystemStreamPartition, String> entry = iterator.next();
+      SystemStreamPartition ssp = entry.getKey();
+      String offset = entry.getValue();
+
+      // Create table entity
+      TaskCheckpointEntity taskCheckpoint = new 
TaskCheckpointEntity(taskName.toString(),
+              serializeSystemStreamPartition(ssp), offset);
+
+      // Add to batch operation
+      batchOperation.insertOrReplace(taskCheckpoint);
+
+      // Execute when batch reaches capacity or this is the last item
+      if (batchOperation.size() >= MAX_WRITE_BATCH_SIZE || 
!iterator.hasNext()) {
+        try {
+          cloudTable.execute(batchOperation);
+        } catch (StorageException e) {
+          LOG.error("Executing batch failed for task: {}", taskName);
+          throw new AzureException(e);
+        }
+        batchOperation.clear();
+      }
+    }
+  }
+
+  private String serializeSystemStreamPartition(SystemStreamPartition ssp) {
+    // Create the Json string for SystemStreamPartition
+    Map<String, String> sspMap = new HashMap<>();
+
+    sspMap.put(SYSTEM_PROP_NAME, ssp.getSystem());
+    sspMap.put(STREAM_PROP_NAME, ssp.getStream());
+    sspMap.put(PARTITION_PROP_NAME, 
String.valueOf(ssp.getPartition().getPartitionId()));
+
+    return new String(jsonSerde.toBytes(sspMap));
+  }
+
+  private SystemStreamPartition deserializeSystemStreamPartition(String 
serializedSSP) {
+    Map<String, String> sspPropertiesMap = 
jsonSerde.fromBytes(serializedSSP.getBytes());
+
+    String systemName = sspPropertiesMap.get(SYSTEM_PROP_NAME);
+    String streamName = sspPropertiesMap.get(STREAM_PROP_NAME);
+    Partition partition = new 
Partition(Integer.parseInt(sspPropertiesMap.get("partition")));
+
+    return new SystemStreamPartition(systemName, streamName, partition);
+  }
+
+  @Override
+  public Checkpoint readLastCheckpoint(TaskName taskName) {
+    if (!taskNames.contains(taskName)) {
+      throw new SamzaException("reading checkpoint of unregistered/unwritten 
task");
+    }
+
+    // Create the query for taskName
+    String partitionQueryKey = taskName.toString();
+    String partitionFilter = TableQuery.generateFilterCondition(
+            PARTITION_KEY,
+            TableQuery.QueryComparisons.EQUAL,
+            partitionQueryKey);
+    TableQuery<TaskCheckpointEntity> query = 
TableQuery.from(TaskCheckpointEntity.class).where(partitionFilter);
+
+    ImmutableMap.Builder<SystemStreamPartition, String> builder = 
ImmutableMap.builder();
+    try {
+      for (TaskCheckpointEntity taskCheckpointEntity : 
cloudTable.execute(query)) {
+        // Recreate the SSP offset
+        String serializedSSP = taskCheckpointEntity.getRowKey();
+        builder.put(deserializeSystemStreamPartition(serializedSSP), 
taskCheckpointEntity.getOffset());
+      }
+
+    } catch (NoSuchElementException e) {
+      LOG.warn("No checkpoints found found for registered taskName={}", 
taskName);
+      // Return null if not entity elements are not found
+      return null;
+    }
+    LOG.debug("Received checkpoint state for taskName=%s", taskName);
+    return new Checkpoint(builder.build());
+  }
+
+  @Override
+  public void stop() {
+    // Nothing to do here
+  }
+
+  @Override
+  public void clearCheckpoints() {
+    LOG.debug("Clearing all checkpoints in Azure table");
+
+    for (TaskName taskName : taskNames) {
+      String partitionQueryKey = taskName.toString();
+
+      // Generate table query
+      String partitionFilter = TableQuery.generateFilterCondition(
+              PARTITION_KEY,
+              TableQuery.QueryComparisons.EQUAL,
+              partitionQueryKey);
+      TableQuery<TaskCheckpointEntity> partitionQuery = 
TableQuery.from(TaskCheckpointEntity.class)
+              .where(partitionFilter);
+
+      // All entities in a given batch must have the same partition key
+      deleteEntities(cloudTable.execute(partitionQuery).iterator());
+    }
+  }
+
+  private void deleteEntities(Iterator<TaskCheckpointEntity> entitiesToDelete) 
{
+    TableBatchOperation batchOperation = new TableBatchOperation();
+
+    while (entitiesToDelete.hasNext()) {
+      TaskCheckpointEntity entity = entitiesToDelete.next();
+
+      // Add to batch operation
+      batchOperation.delete(entity);
+
+      // Execute when batch reaches capacity or when this is the last item
+      if (batchOperation.size() >= MAX_WRITE_BATCH_SIZE || 
!entitiesToDelete.hasNext()) {
+        try {
+          cloudTable.execute(batchOperation);
+        } catch (StorageException e) {
+          LOG.error("Executing batch failed for deleting checkpoints");
+          throw new AzureException(e);
+        }
+        batchOperation.clear();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
 
b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
new file mode 100644
index 0000000..3c5d62a
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.checkpoint.azure;
+
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public class AzureCheckpointManagerFactory implements CheckpointManagerFactory 
{
+  @Override
+  public CheckpointManager getCheckpointManager(Config config, MetricsRegistry 
registry) {
+    return new AzureCheckpointManager(new AzureConfig(config));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java
 
b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java
new file mode 100644
index 0000000..3b673d7
--- /dev/null
+++ 
b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java
@@ -0,0 +1,43 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.checkpoint.azure;
+
+import com.microsoft.azure.storage.table.TableServiceEntity;
+
+public class TaskCheckpointEntity extends TableServiceEntity {
+
+  private String offset;
+
+  public TaskCheckpointEntity() {}
+
+  public TaskCheckpointEntity(String taskName, String systemStreamPartition, 
String offset) {
+    this.partitionKey = taskName;
+    this.rowKey = systemStreamPartition;
+    this.offset = offset;
+  }
+
+  public String getOffset() {
+    return this.offset;
+  }
+
+  public void setOffset(String offset) {
+    this.offset = offset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java 
b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
index dc96d2d..d8002b7 100644
--- a/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
@@ -43,7 +43,7 @@ public class AzureConfig extends MapConfig {
     tableName = "samzatable" + id;
   }
 
-  public String getAzureConnect() {
+  public String getAzureConnectionString() {
     if (!containsKey(AZURE_STORAGE_CONNECT)) {
       throw new ConfigException("Missing " + AZURE_STORAGE_CONNECT + " 
config!");
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
index b689f3e..f50ab72 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
@@ -32,7 +32,7 @@ public class AzureCoordinationUtils implements 
CoordinationUtils {
 
   public AzureCoordinationUtils(Config config) {
     azureConfig = new AzureConfig(config);
-    this.client = new AzureClient(azureConfig.getAzureConnect());
+    this.client = new AzureClient(azureConfig.getAzureConnectionString());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 622932f..468705b 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -103,7 +103,7 @@ public class AzureJobCoordinator implements JobCoordinator {
     processorId = createProcessorId(config);
     currentJMVersion = new AtomicReference<>(INITIAL_STATE);
     AzureConfig azureConfig = new AzureConfig(config);
-    client = new AzureClient(azureConfig.getAzureConnect());
+    client = new AzureClient(azureConfig.getAzureConnectionString());
     leaderBlob = new BlobUtils(client, azureConfig.getAzureContainerName(), 
azureConfig.getAzureBlobName(), azureConfig.getAzureBlobLength());
     errorHandler = (errorMsg) -> {
       LOG.error(errorMsg);

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
 
b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
new file mode 100644
index 0000000..3e5ead0
--- /dev/null
+++ 
b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.azure;
+
+import junit.framework.Assert;
+import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Ignore("Intergration Test")
+public class ITestAzureCheckpointManager {
+
+  private static String storageConnectionString = "";
+  private static CheckpointManager checkpointManager;
+
+  @BeforeClass
+  public static void setupAzureTable() {
+    AzureCheckpointManagerFactory factory = new 
AzureCheckpointManagerFactory();
+    checkpointManager = factory.getCheckpointManager(getConfig(), new 
NoOpMetricsRegistry());
+
+    checkpointManager.start();
+    checkpointManager.clearCheckpoints();
+  }
+
+  @AfterClass
+  public static void teardownAzureTable() {
+    checkpointManager.clearCheckpoints();
+    checkpointManager.stop();
+  }
+
+  private static Config getConfig() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(AzureConfig.AZURE_STORAGE_CONNECT, storageConnectionString);
+
+    return new MapConfig(configMap);
+  }
+
+  @Test
+  public void testStoringAndReadingCheckpointsSamePartition() {
+    Partition partition = new Partition(0);
+    TaskName taskName = new TaskName("taskName0");
+    SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", 
partition);
+    Map<SystemStreamPartition, String> sspMap = new HashMap<>();
+
+    sspMap.put(ssp, "12345");
+    Checkpoint cp0 = new Checkpoint(sspMap);
+
+    sspMap.put(ssp, "54321");
+    Checkpoint cp1 = new Checkpoint(sspMap);
+
+    checkpointManager.register(taskName);
+
+    checkpointManager.writeCheckpoint(taskName, cp0);
+    Checkpoint readCp = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertEquals(cp0, readCp);
+
+    checkpointManager.writeCheckpoint(taskName, cp1);
+    Checkpoint readCp1 = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertEquals(cp1, readCp1);
+  }
+
+  @Test
+  public void testStoringAndReadingCheckpointsMultiPartitions() {
+    Partition partition = new Partition(0);
+    Partition partition1 = new Partition(1);
+    TaskName taskName = new TaskName("taskName");
+    SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", 
partition);
+    SystemStreamPartition ssp1 = new SystemStreamPartition("Azure", "Stream", 
partition1);
+
+    Map<SystemStreamPartition, String> sspMap = new HashMap<>();
+    sspMap.put(ssp, "12345");
+    sspMap.put(ssp1, "54321");
+    Checkpoint cp1 = new Checkpoint(sspMap);
+
+    Map<SystemStreamPartition, String> sspMap2 = new HashMap<>();
+    sspMap2.put(ssp, "12347");
+    sspMap2.put(ssp1, "54323");
+    Checkpoint cp2 = new Checkpoint(sspMap2);
+
+    checkpointManager.register(taskName);
+
+    checkpointManager.writeCheckpoint(taskName, cp1);
+    Checkpoint readCp1 = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertEquals(cp1, readCp1);
+
+    checkpointManager.writeCheckpoint(taskName, cp2);
+    Checkpoint readCp2 = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertEquals(cp2, readCp2);
+  }
+
+  @Test
+  public void testStoringAndReadingCheckpointsMultiTasks() {
+    Partition partition = new Partition(0);
+    Partition partition1 = new Partition(1);
+    TaskName taskName = new TaskName("taskName1");
+    TaskName taskName1 = new TaskName("taskName2");
+    SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", 
partition);
+    SystemStreamPartition ssp1 = new SystemStreamPartition("Azure", "Stream", 
partition1);
+
+    Map<SystemStreamPartition, String> sspMap = new HashMap<>();
+    sspMap.put(ssp, "12345");
+    sspMap.put(ssp1, "54321");
+    Checkpoint cp1 = new Checkpoint(sspMap);
+
+    Map<SystemStreamPartition, String> sspMap2 = new HashMap<>();
+    sspMap2.put(ssp, "12347");
+    sspMap2.put(ssp1, "54323");
+    Checkpoint cp2 = new Checkpoint(sspMap2);
+
+    checkpointManager.register(taskName);
+    checkpointManager.register(taskName1);
+
+    checkpointManager.writeCheckpoint(taskName, cp1);
+    checkpointManager.writeCheckpoint(taskName1, cp2);
+
+    Checkpoint readCp1 = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertNotNull(readCp1);
+    Assert.assertEquals(cp1, readCp1);
+
+    Checkpoint readCp2 = checkpointManager.readLastCheckpoint(taskName1);
+    Assert.assertNotNull(readCp2);
+    Assert.assertEquals(cp2, readCp2);
+
+    checkpointManager.writeCheckpoint(taskName, cp2);
+    checkpointManager.writeCheckpoint(taskName1, cp1);
+
+    readCp1 = checkpointManager.readLastCheckpoint(taskName1);
+    Assert.assertEquals(cp1, readCp1);
+
+    readCp2 = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertEquals(cp2, readCp2);
+  }
+
+  @Test
+  public void testMultipleBatchWrites() {
+    TaskName taskName = new TaskName("taskName3");
+    Map<SystemStreamPartition, String> sspMap = new HashMap<>();
+
+    final int testBatchNum = 2;
+    final int testOffsetNum = testBatchNum * 
AzureCheckpointManager.MAX_WRITE_BATCH_SIZE;
+
+    for (int i = 0; i < testOffsetNum; i++) {
+      Partition partition = new Partition(i);
+      SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", 
partition);
+      sspMap.put(ssp, String.valueOf(i));
+    }
+
+    Checkpoint cp0 = new Checkpoint(sspMap);
+    checkpointManager.register(taskName);
+    checkpointManager.writeCheckpoint(taskName, cp0);
+    Checkpoint readCp = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertEquals(cp0, readCp);
+  }
+}

Reply via email to