dxichen commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r616971736
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -61,6 +61,11 @@ public long getNanos() {
return nanos;
}
+ /**
+ * WARNING: Do not change the toString() representation. It is used for
serde'ing {@link CheckpointId} as part of task
+ * checkpoints, in conjunction with {@link #fromString(String)}.
+ * @return the String representation of this {@link CheckpointId}.
+ */
Review comment:
Agreed, imo keeping it in String format will make it easier to read;
will rename it and use the jackson model for here and all other occurences
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -36,7 +36,7 @@
private final long millis;
private final long nanos;
- public CheckpointId(long millis, long nanos) {
+ private CheckpointId(long millis, long nanos) {
this.millis = millis;
this.nanos = nanos;
}
Review comment:
I think in that case, we can directly store `nano % 1e6` as the class
param, will make that change.
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -19,54 +19,27 @@
package org.apache.samza.checkpoint;
+import java.util.Map;
import org.apache.samza.system.SystemStreamPartition;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * A checkpoint is a mapping of all the streams a job is consuming and the
most recent current offset for each.
- * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as
part of a job restart or as part
- * of restarting a failed container within a running job.
- */
-public class Checkpoint {
- private final Map<SystemStreamPartition, String> offsets;
+public interface Checkpoint {
/**
- * Constructs a new checkpoint based off a map of Samza stream offsets.
- * @param offsets Map of Samza streams to their current offset.
+ * Gets the version number of the Checkpoint
+ * @return Short indicating the version number
*/
- public Checkpoint(Map<SystemStreamPartition, String> offsets) {
- this.offsets = offsets;
- }
+ short getVersion();
/**
- * Gets a unmodifiable view of the current Samza stream offsets.
- * @return A unmodifiable view of a Map of Samza streams to their recorded
offsets.
+ * Gets a unmodifiable view of the last processed offsets for {@link
SystemStreamPartition}s.
+ * The returned value differs based on the Checkpoint version:
+ * <ol>
+ * <li>For {@link CheckpointV1}, returns the input {@link
SystemStreamPartition} offsets, as well
+ * as the latest KafkaStateChangelogOffset for any store changelog
{@link SystemStreamPartition} </li>
+ * <li>For {@link CheckpointV2} returns the input offsets only.</li>
+ * </ol>
Review comment:
Going forwards, we are going to rely on the former in that we will
create checkpointv2 with differentiated state offsets and input offsets. I have
changed the references to checkpoint creation in this PR to reflect that
property. Furthermore in the impl of CheckpointV2, the separation of input and
state offsets are also distinguished
##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -122,6 +139,15 @@ public StorageConfig(Config config) {
return Optional.ofNullable(systemStreamRes);
}
+ public List<String> getStoreBackupManagerClassName(String storeName) {
+ List<String> storeBackupManagers =
getList(String.format(STORE_BACKEND_BACKUP_FACTORIES, storeName), new
ArrayList<>());
+ // For backwards compatibility if the changelog is enabled, we use default
kafka backup factory
+ if (storeBackupManagers.isEmpty() &&
getChangelogStream(storeName).isPresent()) {
+ storeBackupManagers = DEFAULT_STATE_BACKEND_BACKUP_FACTORIES;
+ }
Review comment:
for the first phase of the migration, we are defaulting to restore via
kafka and the changelog will be enabled by default for all the stores with dual
commit to both blob store and changelog.
##########
File path:
samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link
org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link
org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link
org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for
tracking the latest committed
+ * store changelog offsets.
+ *
+ * Kafka state checkpoint marker has the format: [system, stream, partition,
offset], separated by a semi-colon.
+ */
[email protected]
+public class KafkaStateCheckpointMarker {
+ public static final String KAFKA_STATE_BACKEND_FACTORY_NAME =
KafkaChangelogStateBackendFactory.class.getName();
+ public static final String SEPARATOR = ";";
+
+ private final SystemStreamPartition changelogSSP;
+ private final String changelogOffset;
+
+ public KafkaStateCheckpointMarker(SystemStreamPartition changelogSSP, String
changelogOffset) {
+ this.changelogSSP = changelogSSP;
+ this.changelogOffset = changelogOffset;
+ }
+
+ public static KafkaStateCheckpointMarker fromString(String
stateCheckpointMarker) {
+ if (StringUtils.isBlank(stateCheckpointMarker)) {
+ throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker
format: " + stateCheckpointMarker);
+ }
+ String[] payload =
stateCheckpointMarker.split(KafkaStateCheckpointMarker.SEPARATOR);
+ if (payload.length != 4) {
+ throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker
parts count: " + stateCheckpointMarker);
+ }
+
+ String system = payload[0];
+ String stream = payload[1];
+ Partition partition = new Partition(Integer.parseInt(payload[2]));
Review comment:
will wrap in IAE
##########
File path:
samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link
org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link
org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link
org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for
tracking the latest committed
+ * store changelog offsets.
Review comment:
A version field makes sense here, will add
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointV2.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Objects;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the
most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as
part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+
+public class CheckpointV2 implements Checkpoint {
+ public static final short CHECKPOINT_VERSION = 2;
+
+ private final CheckpointId checkpointId;
+ private final Map<SystemStreamPartition, String> inputOffsets;
+ private final Map<String, Map<String, String>> stateCheckpointMarkers;
+
+ /**
+ * Constructs the checkpoint with separated input and state offsets
+ *
+ * @param checkpointId {@link CheckpointId} associated with this checkpoint
+ * @param inputOffsets Map of Samza system stream partition to offset of the
checkpoint
+ * @param stateCheckpoints Map of state backend factory name to map of local
state store names
+ * to state checkpoints
+ */
+ public CheckpointV2(CheckpointId checkpointId,
+ Map<SystemStreamPartition, String> inputOffsets,
+ Map<String, Map<String, String>> stateCheckpoints) {
+ this.checkpointId = checkpointId;
+ this.inputOffsets = ImmutableMap.copyOf(inputOffsets);
+ this.stateCheckpointMarkers = ImmutableMap.copyOf(stateCheckpoints);
Review comment:
Good point, they should not be null; adding invariant checks
##########
File path:
samza-core/src/main/java/org/apache/samza/serializers/JsonCheckpoint.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.util.Map;
+
+/**
+ * Used for Json serialization of the {@link
org.apache.samza.checkpoint.Checkpoint} class by the
+ * {@link CheckpointV2Serde}
+ * This cannot be an internal class as required by Jackson Object mapper
+ */
+public class JsonCheckpoint {
+ private String checkpointId;
+ private Map<String, Map<String, String>> inputOffsets;
+ // Map<StorageBackendFactoryName, Map<StoreName, StateCheckpointMarker>>
Review comment:
will change this to the mixin model
##########
File path:
samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link
org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link
org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link
org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for
tracking the latest committed
+ * store changelog offsets.
+ *
+ * Kafka state checkpoint marker has the format: [system, stream, partition,
offset], separated by a semi-colon.
+ */
[email protected]
+public class KafkaStateCheckpointMarker {
+ public static final String KAFKA_STATE_BACKEND_FACTORY_NAME =
KafkaChangelogStateBackendFactory.class.getName();
+ public static final String SEPARATOR = ";";
+
+ private final SystemStreamPartition changelogSSP;
+ private final String changelogOffset;
+
+ public KafkaStateCheckpointMarker(SystemStreamPartition changelogSSP, String
changelogOffset) {
+ this.changelogSSP = changelogSSP;
+ this.changelogOffset = changelogOffset;
+ }
+
+ public static KafkaStateCheckpointMarker fromString(String
stateCheckpointMarker) {
+ if (StringUtils.isBlank(stateCheckpointMarker)) {
+ throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker
format: " + stateCheckpointMarker);
+ }
+ String[] payload =
stateCheckpointMarker.split(KafkaStateCheckpointMarker.SEPARATOR);
+ if (payload.length != 4) {
+ throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker
parts count: " + stateCheckpointMarker);
+ }
+
+ String system = payload[0];
+ String stream = payload[1];
+ Partition partition = new Partition(Integer.parseInt(payload[2]));
+ String offset = null;
+ if (!"null".equals(payload[3])) {
+ offset = payload[3];
+ }
+
+ return new KafkaStateCheckpointMarker(new SystemStreamPartition(system,
stream, partition), offset);
+ }
+
+ public SystemStreamPartition getChangelogSSP() {
+ return changelogSSP;
+ }
+
+ public String getChangelogOffset() {
+ return changelogOffset;
+ }
+
+ /**
+ * Builds a map of store changelog SSPs to their offset for Kafka changelog
backed stores from the provided
+ * map of state backend factory name to map of store name to serialized
state checkpoint markers.
+ *
+ * @param stateBackendToStoreSCMs Map of state backend factory name to map
of store name to serialized
+ * state checkpoint markers
+ * @return Map of store changelog SSPss to their optional offset, or an
empty map if there is no mapping for
+ * {@link #KAFKA_STATE_BACKEND_FACTORY_NAME} in the input map. Optional
offset may be empty if the
+ * changelog SSP was empty.
+ */
+ public static Map<SystemStreamPartition, Option<String>> scmsToSSPOffsetMap(
+ Map<String, Map<String, String>> stateBackendToStoreSCMs) {
+ Map<SystemStreamPartition, Option<String>> sspToOffsetOptions = new
HashMap<>();
+ if (stateBackendToStoreSCMs.containsKey(KAFKA_STATE_BACKEND_FACTORY_NAME))
{
+ Map<String, String> storeToKafkaSCMs =
stateBackendToStoreSCMs.get(KAFKA_STATE_BACKEND_FACTORY_NAME);
+ storeToKafkaSCMs.forEach((key, value) -> {
+ KafkaStateCheckpointMarker stateMarker =
KafkaStateCheckpointMarker.fromString(value);
+ Option<String> offsetOption =
Option.apply(stateMarker.getChangelogOffset());
+ sspToOffsetOptions.put(new
SystemStreamPartition(stateMarker.getChangelogSSP()), offsetOption);
+ });
+ }
+ return sspToOffsetOptions;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ KafkaStateCheckpointMarker that = (KafkaStateCheckpointMarker) o;
Review comment:
will fix here and everywhere else
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]