Repository: samza Updated Branches: refs/heads/master 05113c339 -> adfc4bfc4
SAMZA-1486; Checkpoint manager implementation with Azure Table vjagadish1989 Author: Daniel Chen <[email protected]> Reviewers: Jagadish<[email protected]> Closes #341 from dxichen/azure-checkpoint-manager Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/adfc4bfc Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/adfc4bfc Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/adfc4bfc Branch: refs/heads/master Commit: adfc4bfc40a5d92b3cc7e7813dd1a73c0c486e0e Parents: 05113c3 Author: Daniel Chen <[email protected]> Authored: Tue Nov 7 17:25:58 2017 -0800 Committer: Jagadish <[email protected]> Committed: Tue Nov 7 17:25:58 2017 -0800 ---------------------------------------------------------------------- .../samza/checkpoint/CheckpointManager.java | 6 +- .../azure/AzureCheckpointManager.java | 236 +++++++++++++++++++ .../azure/AzureCheckpointManagerFactory.java | 33 +++ .../checkpoint/azure/TaskCheckpointEntity.java | 43 ++++ .../org/apache/samza/config/AzureConfig.java | 2 +- .../coordinator/AzureCoordinationUtils.java | 2 +- .../samza/coordinator/AzureJobCoordinator.java | 2 +- .../azure/ITestAzureCheckpointManager.java | 181 ++++++++++++++ 8 files changed, 501 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java index 10f166c..bc75351 100644 --- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java +++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java @@ -44,10 +44,14 @@ public interface CheckpointManager { /** * Returns the last recorded checkpoint for a specified taskName. * @param taskName Specific Samza taskName for which to get the last checkpoint of. - * @return A Checkpoint object with the recorded offset data of the specified partition. + * @return A Checkpoint object with the recorded offset data of the specified partition + * or null if there is no recorded checkpoints for the task. */ Checkpoint readLastCheckpoint(TaskName taskName); + /** + * Perform teardown operations for the Manager. Checkpoints are still persisted. + */ void stop(); /** http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java new file mode 100644 index 0000000..df3e490 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java @@ -0,0 +1,236 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.checkpoint.azure; + +import com.google.common.collect.ImmutableMap; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.table.*; +import org.apache.samza.AzureClient; +import org.apache.samza.AzureException; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.checkpoint.CheckpointManager; +import org.apache.samza.config.AzureConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.system.SystemStreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * Azure checkpoint manager is used to store checkpoints in a Azure Table. + * All the task checkpoints are added to the a single table named "SamzaTaskCheckpoints". + * The table entities take the following form: + * + * +-----------------+---------------------+-------------------+ + * | | Serialized | | + * | TaskName | JSON SSP | Offset | + * | | | | + * +-----------------+---------------------+-------------------+ + * + * Each entity have a partitionKey set as the TaskName and the rowKey set as the SSP. + */ +public class AzureCheckpointManager implements CheckpointManager { + private static final Logger LOG = LoggerFactory.getLogger(AzureCheckpointManager.class.getName()); + private static final String PARTITION_KEY = "PartitionKey"; + + public static final int MAX_WRITE_BATCH_SIZE = 100; + public static final String CHECKPOINT_MANAGER_TABLE_NAME = "SamzaTaskCheckpoints"; + public static final String SYSTEM_PROP_NAME = "system"; + public static final String STREAM_PROP_NAME = "stream"; + public static final String PARTITION_PROP_NAME = "partition"; + + private final String storageConnectionString; + private final AzureClient azureClient; + private CloudTable cloudTable; + + private final Set<TaskName> taskNames = new HashSet<>(); + private final JsonSerdeV2<Map<String, String>> jsonSerde = new JsonSerdeV2<>(); + + AzureCheckpointManager(AzureConfig azureConfig) { + storageConnectionString = azureConfig.getAzureConnectionString(); + azureClient = new AzureClient(storageConnectionString); + } + + @Override + public void start() { + try { + // Create the table if it doesn't exist. + cloudTable = azureClient.getTableClient().getTableReference(CHECKPOINT_MANAGER_TABLE_NAME); + cloudTable.createIfNotExists(); + + } catch (URISyntaxException e) { + LOG.error("Connection string {} specifies an invalid URI while creating checkpoint table.", + storageConnectionString); + throw new AzureException(e); + + } catch (StorageException e) { + LOG.error("Azure Storage failed when creating checkpoint table", e); + throw new AzureException(e); + } + } + + @Override + public void register(TaskName taskName) { + taskNames.add(taskName); + } + + @Override + public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) { + if (!taskNames.contains(taskName)) { + throw new SamzaException("writing checkpoint of unregistered task"); + } + + TableBatchOperation batchOperation = new TableBatchOperation(); + + Iterator<Map.Entry<SystemStreamPartition, String>> iterator = checkpoint.getOffsets().entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<SystemStreamPartition, String> entry = iterator.next(); + SystemStreamPartition ssp = entry.getKey(); + String offset = entry.getValue(); + + // Create table entity + TaskCheckpointEntity taskCheckpoint = new TaskCheckpointEntity(taskName.toString(), + serializeSystemStreamPartition(ssp), offset); + + // Add to batch operation + batchOperation.insertOrReplace(taskCheckpoint); + + // Execute when batch reaches capacity or this is the last item + if (batchOperation.size() >= MAX_WRITE_BATCH_SIZE || !iterator.hasNext()) { + try { + cloudTable.execute(batchOperation); + } catch (StorageException e) { + LOG.error("Executing batch failed for task: {}", taskName); + throw new AzureException(e); + } + batchOperation.clear(); + } + } + } + + private String serializeSystemStreamPartition(SystemStreamPartition ssp) { + // Create the Json string for SystemStreamPartition + Map<String, String> sspMap = new HashMap<>(); + + sspMap.put(SYSTEM_PROP_NAME, ssp.getSystem()); + sspMap.put(STREAM_PROP_NAME, ssp.getStream()); + sspMap.put(PARTITION_PROP_NAME, String.valueOf(ssp.getPartition().getPartitionId())); + + return new String(jsonSerde.toBytes(sspMap)); + } + + private SystemStreamPartition deserializeSystemStreamPartition(String serializedSSP) { + Map<String, String> sspPropertiesMap = jsonSerde.fromBytes(serializedSSP.getBytes()); + + String systemName = sspPropertiesMap.get(SYSTEM_PROP_NAME); + String streamName = sspPropertiesMap.get(STREAM_PROP_NAME); + Partition partition = new Partition(Integer.parseInt(sspPropertiesMap.get("partition"))); + + return new SystemStreamPartition(systemName, streamName, partition); + } + + @Override + public Checkpoint readLastCheckpoint(TaskName taskName) { + if (!taskNames.contains(taskName)) { + throw new SamzaException("reading checkpoint of unregistered/unwritten task"); + } + + // Create the query for taskName + String partitionQueryKey = taskName.toString(); + String partitionFilter = TableQuery.generateFilterCondition( + PARTITION_KEY, + TableQuery.QueryComparisons.EQUAL, + partitionQueryKey); + TableQuery<TaskCheckpointEntity> query = TableQuery.from(TaskCheckpointEntity.class).where(partitionFilter); + + ImmutableMap.Builder<SystemStreamPartition, String> builder = ImmutableMap.builder(); + try { + for (TaskCheckpointEntity taskCheckpointEntity : cloudTable.execute(query)) { + // Recreate the SSP offset + String serializedSSP = taskCheckpointEntity.getRowKey(); + builder.put(deserializeSystemStreamPartition(serializedSSP), taskCheckpointEntity.getOffset()); + } + + } catch (NoSuchElementException e) { + LOG.warn("No checkpoints found found for registered taskName={}", taskName); + // Return null if not entity elements are not found + return null; + } + LOG.debug("Received checkpoint state for taskName=%s", taskName); + return new Checkpoint(builder.build()); + } + + @Override + public void stop() { + // Nothing to do here + } + + @Override + public void clearCheckpoints() { + LOG.debug("Clearing all checkpoints in Azure table"); + + for (TaskName taskName : taskNames) { + String partitionQueryKey = taskName.toString(); + + // Generate table query + String partitionFilter = TableQuery.generateFilterCondition( + PARTITION_KEY, + TableQuery.QueryComparisons.EQUAL, + partitionQueryKey); + TableQuery<TaskCheckpointEntity> partitionQuery = TableQuery.from(TaskCheckpointEntity.class) + .where(partitionFilter); + + // All entities in a given batch must have the same partition key + deleteEntities(cloudTable.execute(partitionQuery).iterator()); + } + } + + private void deleteEntities(Iterator<TaskCheckpointEntity> entitiesToDelete) { + TableBatchOperation batchOperation = new TableBatchOperation(); + + while (entitiesToDelete.hasNext()) { + TaskCheckpointEntity entity = entitiesToDelete.next(); + + // Add to batch operation + batchOperation.delete(entity); + + // Execute when batch reaches capacity or when this is the last item + if (batchOperation.size() >= MAX_WRITE_BATCH_SIZE || !entitiesToDelete.hasNext()) { + try { + cloudTable.execute(batchOperation); + } catch (StorageException e) { + LOG.error("Executing batch failed for deleting checkpoints"); + throw new AzureException(e); + } + batchOperation.clear(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java new file mode 100644 index 0000000..3c5d62a --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java @@ -0,0 +1,33 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.checkpoint.azure; + +import org.apache.samza.checkpoint.CheckpointManager; +import org.apache.samza.checkpoint.CheckpointManagerFactory; +import org.apache.samza.config.AzureConfig; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; + +public class AzureCheckpointManagerFactory implements CheckpointManagerFactory { + @Override + public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry) { + return new AzureCheckpointManager(new AzureConfig(config)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java new file mode 100644 index 0000000..3b673d7 --- /dev/null +++ b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java @@ -0,0 +1,43 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.checkpoint.azure; + +import com.microsoft.azure.storage.table.TableServiceEntity; + +public class TaskCheckpointEntity extends TableServiceEntity { + + private String offset; + + public TaskCheckpointEntity() {} + + public TaskCheckpointEntity(String taskName, String systemStreamPartition, String offset) { + this.partitionKey = taskName; + this.rowKey = systemStreamPartition; + this.offset = offset; + } + + public String getOffset() { + return this.offset; + } + + public void setOffset(String offset) { + this.offset = offset; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java index dc96d2d..d8002b7 100644 --- a/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java +++ b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java @@ -43,7 +43,7 @@ public class AzureConfig extends MapConfig { tableName = "samzatable" + id; } - public String getAzureConnect() { + public String getAzureConnectionString() { if (!containsKey(AZURE_STORAGE_CONNECT)) { throw new ConfigException("Missing " + AZURE_STORAGE_CONNECT + " config!"); } http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java index b689f3e..f50ab72 100644 --- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java @@ -32,7 +32,7 @@ public class AzureCoordinationUtils implements CoordinationUtils { public AzureCoordinationUtils(Config config) { azureConfig = new AzureConfig(config); - this.client = new AzureClient(azureConfig.getAzureConnect()); + this.client = new AzureClient(azureConfig.getAzureConnectionString()); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java index 622932f..468705b 100644 --- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java @@ -103,7 +103,7 @@ public class AzureJobCoordinator implements JobCoordinator { processorId = createProcessorId(config); currentJMVersion = new AtomicReference<>(INITIAL_STATE); AzureConfig azureConfig = new AzureConfig(config); - client = new AzureClient(azureConfig.getAzureConnect()); + client = new AzureClient(azureConfig.getAzureConnectionString()); leaderBlob = new BlobUtils(client, azureConfig.getAzureContainerName(), azureConfig.getAzureBlobName(), azureConfig.getAzureBlobLength()); errorHandler = (errorMsg) -> { LOG.error(errorMsg); http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java new file mode 100644 index 0000000..3e5ead0 --- /dev/null +++ b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.checkpoint.azure; + +import junit.framework.Assert; +import org.apache.samza.Partition; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.checkpoint.CheckpointManager; +import org.apache.samza.config.AzureConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.junit.*; + +import java.util.HashMap; +import java.util.Map; + +@Ignore("Intergration Test") +public class ITestAzureCheckpointManager { + + private static String storageConnectionString = ""; + private static CheckpointManager checkpointManager; + + @BeforeClass + public static void setupAzureTable() { + AzureCheckpointManagerFactory factory = new AzureCheckpointManagerFactory(); + checkpointManager = factory.getCheckpointManager(getConfig(), new NoOpMetricsRegistry()); + + checkpointManager.start(); + checkpointManager.clearCheckpoints(); + } + + @AfterClass + public static void teardownAzureTable() { + checkpointManager.clearCheckpoints(); + checkpointManager.stop(); + } + + private static Config getConfig() { + Map<String, String> configMap = new HashMap<>(); + configMap.put(AzureConfig.AZURE_STORAGE_CONNECT, storageConnectionString); + + return new MapConfig(configMap); + } + + @Test + public void testStoringAndReadingCheckpointsSamePartition() { + Partition partition = new Partition(0); + TaskName taskName = new TaskName("taskName0"); + SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", partition); + Map<SystemStreamPartition, String> sspMap = new HashMap<>(); + + sspMap.put(ssp, "12345"); + Checkpoint cp0 = new Checkpoint(sspMap); + + sspMap.put(ssp, "54321"); + Checkpoint cp1 = new Checkpoint(sspMap); + + checkpointManager.register(taskName); + + checkpointManager.writeCheckpoint(taskName, cp0); + Checkpoint readCp = checkpointManager.readLastCheckpoint(taskName); + Assert.assertEquals(cp0, readCp); + + checkpointManager.writeCheckpoint(taskName, cp1); + Checkpoint readCp1 = checkpointManager.readLastCheckpoint(taskName); + Assert.assertEquals(cp1, readCp1); + } + + @Test + public void testStoringAndReadingCheckpointsMultiPartitions() { + Partition partition = new Partition(0); + Partition partition1 = new Partition(1); + TaskName taskName = new TaskName("taskName"); + SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", partition); + SystemStreamPartition ssp1 = new SystemStreamPartition("Azure", "Stream", partition1); + + Map<SystemStreamPartition, String> sspMap = new HashMap<>(); + sspMap.put(ssp, "12345"); + sspMap.put(ssp1, "54321"); + Checkpoint cp1 = new Checkpoint(sspMap); + + Map<SystemStreamPartition, String> sspMap2 = new HashMap<>(); + sspMap2.put(ssp, "12347"); + sspMap2.put(ssp1, "54323"); + Checkpoint cp2 = new Checkpoint(sspMap2); + + checkpointManager.register(taskName); + + checkpointManager.writeCheckpoint(taskName, cp1); + Checkpoint readCp1 = checkpointManager.readLastCheckpoint(taskName); + Assert.assertEquals(cp1, readCp1); + + checkpointManager.writeCheckpoint(taskName, cp2); + Checkpoint readCp2 = checkpointManager.readLastCheckpoint(taskName); + Assert.assertEquals(cp2, readCp2); + } + + @Test + public void testStoringAndReadingCheckpointsMultiTasks() { + Partition partition = new Partition(0); + Partition partition1 = new Partition(1); + TaskName taskName = new TaskName("taskName1"); + TaskName taskName1 = new TaskName("taskName2"); + SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", partition); + SystemStreamPartition ssp1 = new SystemStreamPartition("Azure", "Stream", partition1); + + Map<SystemStreamPartition, String> sspMap = new HashMap<>(); + sspMap.put(ssp, "12345"); + sspMap.put(ssp1, "54321"); + Checkpoint cp1 = new Checkpoint(sspMap); + + Map<SystemStreamPartition, String> sspMap2 = new HashMap<>(); + sspMap2.put(ssp, "12347"); + sspMap2.put(ssp1, "54323"); + Checkpoint cp2 = new Checkpoint(sspMap2); + + checkpointManager.register(taskName); + checkpointManager.register(taskName1); + + checkpointManager.writeCheckpoint(taskName, cp1); + checkpointManager.writeCheckpoint(taskName1, cp2); + + Checkpoint readCp1 = checkpointManager.readLastCheckpoint(taskName); + Assert.assertNotNull(readCp1); + Assert.assertEquals(cp1, readCp1); + + Checkpoint readCp2 = checkpointManager.readLastCheckpoint(taskName1); + Assert.assertNotNull(readCp2); + Assert.assertEquals(cp2, readCp2); + + checkpointManager.writeCheckpoint(taskName, cp2); + checkpointManager.writeCheckpoint(taskName1, cp1); + + readCp1 = checkpointManager.readLastCheckpoint(taskName1); + Assert.assertEquals(cp1, readCp1); + + readCp2 = checkpointManager.readLastCheckpoint(taskName); + Assert.assertEquals(cp2, readCp2); + } + + @Test + public void testMultipleBatchWrites() { + TaskName taskName = new TaskName("taskName3"); + Map<SystemStreamPartition, String> sspMap = new HashMap<>(); + + final int testBatchNum = 2; + final int testOffsetNum = testBatchNum * AzureCheckpointManager.MAX_WRITE_BATCH_SIZE; + + for (int i = 0; i < testOffsetNum; i++) { + Partition partition = new Partition(i); + SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", partition); + sspMap.put(ssp, String.valueOf(i)); + } + + Checkpoint cp0 = new Checkpoint(sspMap); + checkpointManager.register(taskName); + checkpointManager.writeCheckpoint(taskName, cp0); + Checkpoint readCp = checkpointManager.readLastCheckpoint(taskName); + Assert.assertEquals(cp0, readCp); + } +}
