dxichen commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r616971736



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -61,6 +61,11 @@ public long getNanos() {
     return nanos;
   }
 
+  /**
+   * WARNING: Do not change the toString() representation. It is used for 
serde'ing {@link CheckpointId} as part of task
+   * checkpoints, in conjunction with {@link #fromString(String)}.
+   * @return the String representation of this {@link CheckpointId}.
+   */

Review comment:
       Agreed, imo keeping it in String format will make it easier to read; 
will rename it and use the jackson model for here and all other occurences

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -36,7 +36,7 @@
   private final long millis;
   private final long nanos;
 
-  public CheckpointId(long millis, long nanos) {
+  private CheckpointId(long millis, long nanos) {
     this.millis = millis;
     this.nanos = nanos;
   }

Review comment:
       I think in that case, we can directly store `nano % 1e6` as the class 
param, will make that change.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -19,54 +19,27 @@
 
 package org.apache.samza.checkpoint;
 
+import java.util.Map;
 import org.apache.samza.system.SystemStreamPartition;
 
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * A checkpoint is a mapping of all the streams a job is consuming and the 
most recent current offset for each.
- * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as 
part of a job restart or as part
- * of restarting a failed container within a running job.
- */
-public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
 
+public interface Checkpoint {
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Gets the version number of the Checkpoint
+   * @return Short indicating the version number
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
-  }
+  short getVersion();
 
   /**
-   * Gets a unmodifiable view of the current Samza stream offsets.
-   * @return A unmodifiable view of a Map of Samza streams to their recorded 
offsets.
+   * Gets a unmodifiable view of the last processed offsets for {@link 
SystemStreamPartition}s.
+   * The returned value differs based on the Checkpoint version:
+   * <ol>
+   *    <li>For {@link CheckpointV1}, returns the input {@link 
SystemStreamPartition} offsets, as well
+   *      as the latest KafkaStateChangelogOffset for any store changelog 
{@link SystemStreamPartition} </li>
+   *    <li>For {@link CheckpointV2} returns the input offsets only.</li>
+   * </ol>

Review comment:
       Going forwards, we are going to rely on the former in that we will 
create checkpointv2 with differentiated state offsets and input offsets. I have 
changed the references to checkpoint creation in this PR to reflect that 
property. Furthermore in the impl of CheckpointV2, the separation of input and 
state offsets are also distinguished

##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -122,6 +139,15 @@ public StorageConfig(Config config) {
     return Optional.ofNullable(systemStreamRes);
   }
 
+  public List<String> getStoreBackupManagerClassName(String storeName) {
+    List<String> storeBackupManagers = 
getList(String.format(STORE_BACKEND_BACKUP_FACTORIES, storeName), new 
ArrayList<>());
+    // For backwards compatibility if the changelog is enabled, we use default 
kafka backup factory
+    if (storeBackupManagers.isEmpty() && 
getChangelogStream(storeName).isPresent()) {
+      storeBackupManagers = DEFAULT_STATE_BACKEND_BACKUP_FACTORIES;
+    }

Review comment:
       for the first phase of the migration, we are defaulting to restore via 
kafka and the changelog will be enabled by default for all the stores with dual 
commit to both blob store and changelog.

##########
File path: 
samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link 
org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link 
org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link 
org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for 
tracking the latest committed
+ * store changelog offsets.
+ *
+ * Kafka state checkpoint marker has the format: [system, stream, partition, 
offset], separated by a semi-colon.
+ */
[email protected]
+public class KafkaStateCheckpointMarker {
+  public static final String KAFKA_STATE_BACKEND_FACTORY_NAME = 
KafkaChangelogStateBackendFactory.class.getName();
+  public static final String SEPARATOR = ";";
+
+  private final SystemStreamPartition changelogSSP;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition changelogSSP, String 
changelogOffset) {
+    this.changelogSSP = changelogSSP;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static KafkaStateCheckpointMarker fromString(String 
stateCheckpointMarker) {
+    if (StringUtils.isBlank(stateCheckpointMarker)) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker 
format: " + stateCheckpointMarker);
+    }
+    String[] payload = 
stateCheckpointMarker.split(KafkaStateCheckpointMarker.SEPARATOR);
+    if (payload.length != 4) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker 
parts count: " + stateCheckpointMarker);
+    }
+
+    String system = payload[0];
+    String stream = payload[1];
+    Partition partition = new Partition(Integer.parseInt(payload[2]));

Review comment:
       will wrap in IAE

##########
File path: 
samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link 
org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link 
org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link 
org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for 
tracking the latest committed
+ * store changelog offsets.

Review comment:
       A version field makes sense here, will add

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointV2.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Objects;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the 
most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as 
part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+
+public class CheckpointV2 implements Checkpoint {
+  public static final short CHECKPOINT_VERSION = 2;
+
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, Map<String, String>> stateCheckpointMarkers;
+
+  /**
+   * Constructs the checkpoint with separated input and state offsets
+   *
+   * @param checkpointId {@link CheckpointId} associated with this checkpoint
+   * @param inputOffsets Map of Samza system stream partition to offset of the 
checkpoint
+   * @param stateCheckpoints Map of state backend factory name to map of local 
state store names
+   *                         to state checkpoints
+   */
+  public CheckpointV2(CheckpointId checkpointId,
+      Map<SystemStreamPartition, String> inputOffsets,
+      Map<String, Map<String, String>> stateCheckpoints) {
+    this.checkpointId = checkpointId;
+    this.inputOffsets = ImmutableMap.copyOf(inputOffsets);
+    this.stateCheckpointMarkers = ImmutableMap.copyOf(stateCheckpoints);

Review comment:
       Good point, they should not be null; adding invariant checks

##########
File path: 
samza-core/src/main/java/org/apache/samza/serializers/JsonCheckpoint.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.util.Map;
+
+/**
+ * Used for Json serialization of the {@link 
org.apache.samza.checkpoint.Checkpoint} class by the
+ * {@link CheckpointV2Serde}
+ * This cannot be an internal class as required by Jackson Object mapper
+ */
+public class JsonCheckpoint {
+  private String checkpointId;
+  private Map<String, Map<String, String>> inputOffsets;
+  // Map<StorageBackendFactoryName, Map<StoreName, StateCheckpointMarker>>

Review comment:
       will change this to the mixin model

##########
File path: 
samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link 
org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link 
org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link 
org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for 
tracking the latest committed
+ * store changelog offsets.
+ *
+ * Kafka state checkpoint marker has the format: [system, stream, partition, 
offset], separated by a semi-colon.
+ */
[email protected]
+public class KafkaStateCheckpointMarker {
+  public static final String KAFKA_STATE_BACKEND_FACTORY_NAME = 
KafkaChangelogStateBackendFactory.class.getName();
+  public static final String SEPARATOR = ";";
+
+  private final SystemStreamPartition changelogSSP;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition changelogSSP, String 
changelogOffset) {
+    this.changelogSSP = changelogSSP;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static KafkaStateCheckpointMarker fromString(String 
stateCheckpointMarker) {
+    if (StringUtils.isBlank(stateCheckpointMarker)) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker 
format: " + stateCheckpointMarker);
+    }
+    String[] payload = 
stateCheckpointMarker.split(KafkaStateCheckpointMarker.SEPARATOR);
+    if (payload.length != 4) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker 
parts count: " + stateCheckpointMarker);
+    }
+
+    String system = payload[0];
+    String stream = payload[1];
+    Partition partition = new Partition(Integer.parseInt(payload[2]));
+    String offset = null;
+    if (!"null".equals(payload[3])) {
+      offset = payload[3];
+    }
+
+    return new KafkaStateCheckpointMarker(new SystemStreamPartition(system, 
stream, partition), offset);
+  }
+
+  public SystemStreamPartition getChangelogSSP() {
+    return changelogSSP;
+  }
+
+  public String getChangelogOffset() {
+    return changelogOffset;
+  }
+
+  /**
+   * Builds a map of store changelog SSPs to their offset for Kafka changelog 
backed stores from the provided
+   * map of state backend factory name to map of store name to serialized 
state checkpoint markers.
+   *
+   * @param stateBackendToStoreSCMs Map of state backend factory name to map 
of store name to serialized
+   *                                state checkpoint markers
+   * @return Map of store changelog SSPss to their optional offset, or an 
empty map if there is no mapping for
+   * {@link #KAFKA_STATE_BACKEND_FACTORY_NAME} in the input map. Optional 
offset may be empty if the
+   * changelog SSP was empty.
+   */
+  public static Map<SystemStreamPartition, Option<String>> scmsToSSPOffsetMap(
+      Map<String, Map<String, String>> stateBackendToStoreSCMs) {
+    Map<SystemStreamPartition, Option<String>> sspToOffsetOptions = new 
HashMap<>();
+    if (stateBackendToStoreSCMs.containsKey(KAFKA_STATE_BACKEND_FACTORY_NAME)) 
{
+      Map<String, String> storeToKafkaSCMs = 
stateBackendToStoreSCMs.get(KAFKA_STATE_BACKEND_FACTORY_NAME);
+      storeToKafkaSCMs.forEach((key, value) -> {
+        KafkaStateCheckpointMarker stateMarker = 
KafkaStateCheckpointMarker.fromString(value);
+        Option<String> offsetOption = 
Option.apply(stateMarker.getChangelogOffset());
+        sspToOffsetOptions.put(new 
SystemStreamPartition(stateMarker.getChangelogSSP()), offsetOption);
+      });
+    }
+    return sspToOffsetOptions;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    KafkaStateCheckpointMarker that = (KafkaStateCheckpointMarker) o;

Review comment:
       will fix here and everywhere else




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to