prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r536338915
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -55,18 +82,18 @@ public boolean equals(Object o) {
Checkpoint that = (Checkpoint) o;
- if (offsets != null ? !offsets.equals(that.offsets) : that.offsets !=
null) return false;
-
- return true;
+ return (checkpointId.equals(that.checkpointId)) &&
+ (Objects.equals(inputOffsets, that.inputOffsets)) &&
+ (Objects.equals(stateCheckpoint, that.stateCheckpoint));
}
@Override
public int hashCode() {
- return offsets != null ? offsets.hashCode() : 0;
+ return inputOffsets != null ? inputOffsets.hashCode() : 0;
}
@Override
public String toString() {
- return "Checkpoint [offsets=" + offsets + "]";
+ return "Checkpoint [inputOffsets=" + inputOffsets + "]";
Review comment:
Print checkpointId, stateCheckpointMarkers as well?
##########
File path:
samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
* Checkpointed changelog offset has the format: [checkpointId, offset],
separated by a colon.
*/
@InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {
public static final String SEPARATOR = ":";
private final CheckpointId checkpointId;
- private final String offset;
+ private final String changelogOffset;
- public CheckpointedChangelogOffset(CheckpointId checkpointId, String offset)
{
+ public KafkaStateChangelogOffset(CheckpointId checkpointId, String
changelogOffset) {
this.checkpointId = checkpointId;
- this.offset = offset;
+ this.changelogOffset = changelogOffset;
}
- public static CheckpointedChangelogOffset fromString(String message) {
+ public static KafkaStateChangelogOffset fromString(String message) {
if (StringUtils.isBlank(message)) {
throw new IllegalArgumentException("Invalid checkpointed changelog
message: " + message);
}
- String[] checkpointIdAndOffset = message.split(":");
- if (checkpointIdAndOffset.length != 2) {
+ String[] checkpointIdAndOffset = message.split(SEPARATOR);
+ if (checkpointIdAndOffset.length < 2 || checkpointIdAndOffset.length > 3) {
throw new IllegalArgumentException("Invalid checkpointed changelog
offset: " + message);
}
CheckpointId checkpointId =
CheckpointId.fromString(checkpointIdAndOffset[0]);
String offset = null;
if (!"null".equals(checkpointIdAndOffset[1])) {
offset = checkpointIdAndOffset[1];
}
- return new CheckpointedChangelogOffset(checkpointId, offset);
+
+ return new KafkaStateChangelogOffset(checkpointId, offset);
}
public CheckpointId getCheckpointId() {
return checkpointId;
}
- public String getOffset() {
- return offset;
+ public String getChangelogOffset() {
+ return changelogOffset;
}
@Override
public String toString() {
- return String.format("%s%s%s", checkpointId, SEPARATOR, offset);
+ return String.format("%s%s%s%s%s", checkpointId, SEPARATOR,
changelogOffset);
Review comment:
Remove extra %s
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +32,47 @@
* of restarting a failed container within a running job.
*/
public class Checkpoint {
- private final Map<SystemStreamPartition, String> offsets;
+ private static final short PROTOCOL_VERSION = 1;
+ private final CheckpointId checkpointId;
+ private final Map<SystemStreamPartition, String> inputOffsets;
+ private final Map<String, List<StateCheckpointMarker>> stateCheckpoint;
/**
* Constructs a new checkpoint based off a map of Samza stream offsets.
* @param offsets Map of Samza streams to their current offset.
*/
public Checkpoint(Map<SystemStreamPartition, String> offsets) {
- this.offsets = offsets;
+ this(CheckpointId.create(), offsets, null);
+ }
+
+ public Checkpoint(CheckpointId id, Map<SystemStreamPartition, String>
inputOffsets, Map<String, List<StateCheckpointMarker>> stateCheckpoint) {
+ this.checkpointId = id;
+ this.inputOffsets = inputOffsets;
+ this.stateCheckpoint = stateCheckpoint;
+ }
+
+ /**
+ * Gets the checkpoint id for the checkpoint
+ * @return The timestamp based checkpoint identifier associated with the
checkpoint
+ */
+ public CheckpointId getCheckpointId() {
+ return checkpointId;
}
/**
* Gets a unmodifiable view of the current Samza stream offsets.
* @return A unmodifiable view of a Map of Samza streams to their recorded
offsets.
*/
- public Map<SystemStreamPartition, String> getOffsets() {
- return Collections.unmodifiableMap(offsets);
+ public Map<SystemStreamPartition, String> getInputOffsets() {
+ return Collections.unmodifiableMap(inputOffsets);
+ }
+
+ /**
+ * Gets the stateCheckpointMarkers
+ * @return The state checkpoint markers for the checkpoint
+ */
+ public Map<String, List<StateCheckpointMarker>> getStateCheckpointMarkers() {
+ return stateCheckpoint;
Review comment:
Try to avoid returning nulls. Either return an empty map here
(preferred) or mark method as Nullable and clarify nullability in javadoc.
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +32,47 @@
* of restarting a failed container within a running job.
*/
public class Checkpoint {
- private final Map<SystemStreamPartition, String> offsets;
+ private static final short PROTOCOL_VERSION = 1;
+ private final CheckpointId checkpointId;
+ private final Map<SystemStreamPartition, String> inputOffsets;
+ private final Map<String, List<StateCheckpointMarker>> stateCheckpoint;
Review comment:
Minor: stateCheckpoints (plural), here and everywhere else.
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -55,18 +82,18 @@ public boolean equals(Object o) {
Checkpoint that = (Checkpoint) o;
- if (offsets != null ? !offsets.equals(that.offsets) : that.offsets !=
null) return false;
-
- return true;
+ return (checkpointId.equals(that.checkpointId)) &&
+ (Objects.equals(inputOffsets, that.inputOffsets)) &&
+ (Objects.equals(stateCheckpoint, that.stateCheckpoint));
}
@Override
public int hashCode() {
- return offsets != null ? offsets.hashCode() : 0;
+ return inputOffsets != null ? inputOffsets.hashCode() : 0;
Review comment:
hashCode should also be based on checkpointId and stateCheckpoints to
match equals implementation.
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -55,18 +82,18 @@ public boolean equals(Object o) {
Checkpoint that = (Checkpoint) o;
- if (offsets != null ? !offsets.equals(that.offsets) : that.offsets !=
null) return false;
-
- return true;
+ return (checkpointId.equals(that.checkpointId)) &&
+ (Objects.equals(inputOffsets, that.inputOffsets)) &&
Review comment:
Does Objects.equals do a deep equality comparison for Map types or is it
only using default Map.equals (comparing references)?
##########
File path:
samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
* Checkpointed changelog offset has the format: [checkpointId, offset],
separated by a colon.
*/
@InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {
public static final String SEPARATOR = ":";
private final CheckpointId checkpointId;
Review comment:
Does this still need to contain the checkpointId? Is this for backcompat?
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +32,47 @@
* of restarting a failed container within a running job.
*/
public class Checkpoint {
- private final Map<SystemStreamPartition, String> offsets;
+ private static final short PROTOCOL_VERSION = 1;
+ private final CheckpointId checkpointId;
+ private final Map<SystemStreamPartition, String> inputOffsets;
+ private final Map<String, List<StateCheckpointMarker>> stateCheckpoint;
/**
* Constructs a new checkpoint based off a map of Samza stream offsets.
* @param offsets Map of Samza streams to their current offset.
*/
public Checkpoint(Map<SystemStreamPartition, String> offsets) {
Review comment:
Minor: inputOffsets (param name)
##########
File path:
samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
* Checkpointed changelog offset has the format: [checkpointId, offset],
separated by a colon.
*/
@InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {
public static final String SEPARATOR = ":";
private final CheckpointId checkpointId;
- private final String offset;
+ private final String changelogOffset;
- public CheckpointedChangelogOffset(CheckpointId checkpointId, String offset)
{
+ public KafkaStateChangelogOffset(CheckpointId checkpointId, String
changelogOffset) {
this.checkpointId = checkpointId;
- this.offset = offset;
+ this.changelogOffset = changelogOffset;
}
- public static CheckpointedChangelogOffset fromString(String message) {
+ public static KafkaStateChangelogOffset fromString(String message) {
if (StringUtils.isBlank(message)) {
throw new IllegalArgumentException("Invalid checkpointed changelog
message: " + message);
}
- String[] checkpointIdAndOffset = message.split(":");
- if (checkpointIdAndOffset.length != 2) {
+ String[] checkpointIdAndOffset = message.split(SEPARATOR);
+ if (checkpointIdAndOffset.length < 2 || checkpointIdAndOffset.length > 3) {
Review comment:
Why relax this check?
##########
File path:
samza-api/src/main/java/org/apache/samza/checkpoint/RemoteStoreMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+
+public class RemoteStoreMetadata {
+ public static final String SEPARATOR = ";";
+ // backwards compatibility for this unstable api
+ private static final short PROTOCOL_VERSION = 0;
+
+ // blob store location id obtained after upload
+ private final String blobId;
+ // timestamp of when the upload was completed
+ private final long createdMillis;
+
+ public RemoteStoreMetadata(String blobId, long createdMillis) {
+ this.blobId = blobId;
+ this.createdMillis = createdMillis;
+ }
+
+ public String getBlobId() {
+ return blobId;
+ }
+
+ public long getCreatedMillis() {
+ return createdMillis;
+ }
+
+ public short getProtocolVersion() {
+ return PROTOCOL_VERSION;
+ }
+
+ public static RemoteStoreMetadata fromString(String message) {
Review comment:
Do we still need this method if we have an explicit
StateCheckpointMarkerSerde?
##########
File path:
samza-api/src/main/java/org/apache/samza/checkpoint/RemoteStoreMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+
+public class RemoteStoreMetadata {
+ public static final String SEPARATOR = ";";
+ // backwards compatibility for this unstable api
+ private static final short PROTOCOL_VERSION = 0;
Review comment:
Minor: Start with 1.
##########
File path:
samza-api/src/main/java/org/apache/samza/checkpoint/RemoteStoreMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+
+public class RemoteStoreMetadata {
+ public static final String SEPARATOR = ";";
+ // backwards compatibility for this unstable api
+ private static final short PROTOCOL_VERSION = 0;
+
+ // blob store location id obtained after upload
+ private final String blobId;
+ // timestamp of when the upload was completed
+ private final long createdMillis;
+
+ public RemoteStoreMetadata(String blobId, long createdMillis) {
+ this.blobId = blobId;
+ this.createdMillis = createdMillis;
+ }
+
+ public String getBlobId() {
+ return blobId;
+ }
+
+ public long getCreatedMillis() {
+ return createdMillis;
+ }
+
+ public short getProtocolVersion() {
+ return PROTOCOL_VERSION;
+ }
+
+ public static RemoteStoreMetadata fromString(String message) {
+ if (StringUtils.isBlank(message)) {
+ throw new IllegalArgumentException("Invalid remote store checkpoint
message: " + message);
+ }
+ String[] parts = message.split(SEPARATOR);
+ if (parts.length != 3) {
+ throw new IllegalArgumentException("Invalid checkpointed changelog
offset: " + message);
Review comment:
Fix message.
##########
File path:
samza-api/src/main/java/org/apache/samza/checkpoint/RemoteStoreMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+
+public class RemoteStoreMetadata {
Review comment:
BlobStoreStateCheckpointMarker?
##########
File path:
samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarker.java
##########
@@ -17,21 +17,16 @@
* under the License.
*/
-package org.apache.samza.storage
+package org.apache.samza.checkpoint;
-import org.apache.samza.checkpoint.CheckpointId
-import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.storage.StateBackendFactory;
-trait TaskStorageManager {
- def getStore(storeName: String): Option[StorageEngine]
-
- def flush(): Map[SystemStreamPartition, Option[String]]
-
- def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets:
Map[SystemStreamPartition, Option[String]]): Unit
-
- def removeOldCheckpoints(checkpointId: CheckpointId): Unit
-
- def stop(): Unit
+/**
+ * Interface for State Checkpoint Marker for all TaskStorageBackupManagers
+ */
+public interface StateCheckpointMarker {
Review comment:
Should KafkaCheckpointMarker and BlobStoreCheckpointMarker implement
this interface?
##########
File path:
samza-api/src/main/java/org/apache/samza/checkpoint/StateCheckpointMarker.java
##########
@@ -17,21 +17,16 @@
* under the License.
*/
-package org.apache.samza.storage
+package org.apache.samza.checkpoint;
-import org.apache.samza.checkpoint.CheckpointId
-import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.storage.StateBackendFactory;
-trait TaskStorageManager {
- def getStore(storeName: String): Option[StorageEngine]
-
- def flush(): Map[SystemStreamPartition, Option[String]]
-
- def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets:
Map[SystemStreamPartition, Option[String]]): Unit
-
- def removeOldCheckpoints(checkpointId: CheckpointId): Unit
-
- def stop(): Unit
+/**
+ * Interface for State Checkpoint Marker for all TaskStorageBackupManagers
+ */
+public interface StateCheckpointMarker {
+ String getFactoryName();
-}
\ No newline at end of file
+ StateBackendFactory getFactory();
Review comment:
Why does this need to return the actual factory instance?
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/TaskStorageAdmin.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+public interface TaskStorageAdmin {
+
+ void createResources();
Review comment:
Also add validateResources
##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -240,6 +251,10 @@ public long getChangelogMinCompactionLagMs(String
storeName) {
return getLong(minCompactLagConfigName,
getDefaultChangelogMinCompactionLagMs());
}
+ public String getStateRestoreBackupManager() {
+ return get(STATE_BACKUP_MANAGER_FACTORY,
DEFAULT_STATE_BACKUP_MANAGER_FACTORY);
Review comment:
s/getStateRestoreBackupManager/getStateBackupManager
##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -55,18 +82,18 @@ public boolean equals(Object o) {
Checkpoint that = (Checkpoint) o;
- if (offsets != null ? !offsets.equals(that.offsets) : that.offsets !=
null) return false;
-
- return true;
+ return (checkpointId.equals(that.checkpointId)) &&
Review comment:
Need to include protocol version in equals and hashcode?
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.util.Clock;
+
+
+public interface StateBackendFactory {
+ TaskStorageBackupManager getBackupManager(TaskModel taskModel,
Review comment:
Minor: s/TaskStorageBackupManager/TaskBackupManager or
TaskStateBackupManager, and make it consistent with TaskRestoreManager naming
convention.
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/TaskStorageBackupManager.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskStorageBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ * <li>Snapshot will be called before Upload.</li>
+ * <li>persistToFilesystem will be called after Upload is completed</li>
+ * <li>Cleanup is only called after Upload and persistToFilesystem has
successfully completed</li>
+ * </ul>
+ */
+public interface TaskStorageBackupManager {
+
+ /**
+ * Commit operation that is synchronous to processing
+ * @param checkpointId Checkpoint id of the current commit
+ * @return The storename to checkpoint of the snapshotted local store
+ */
+ Map<String, StateCheckpointMarker> snapshot(CheckpointId checkpointId);
+
+ /**
+ * Commit operation that is asynchronous to message processing,
+ * @param checkpointId Checkpoint id of the current commit
+ * @param stateCheckpointMarkers The map of storename to checkpoint makers
returned by the snapshot
+ * @return The future of storename to checkpoint map of the uploaded local
store
+ */
+ CompletableFuture<Map<String, StateCheckpointMarker>> upload(CheckpointId
checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+ /**
+ * Persist the state locally to the file system
+ * @param checkpointId The id of the checkpoint to be committed
+ * @param stateCheckpointMarkers Uploaded storename to checkpoints markers
to be persisted locally
+ */
+ void persistToFilesystem(CheckpointId checkpointId, Map<String,
StateCheckpointMarker> stateCheckpointMarkers);
Review comment:
Do you need input offsets here too (e.g. to write offsets file). If so,
might be worth passing the entire `Checkpoint` instance.
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.util.Clock;
+
+
+public interface StateBackendFactory {
+ TaskStorageBackupManager getBackupManager(TaskModel taskModel,
Review comment:
Also, pass the jobModel, containerModel and clock as well for
consistency with restore manager interface.
##########
File path:
samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
* Checkpointed changelog offset has the format: [checkpointId, offset],
separated by a colon.
*/
@InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {
Review comment:
IIUC, this should implement StateCheckpointMarker interface?
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/TaskStorageBackupManager.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskStorageBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ * <li>Snapshot will be called before Upload.</li>
+ * <li>persistToFilesystem will be called after Upload is completed</li>
+ * <li>Cleanup is only called after Upload and persistToFilesystem has
successfully completed</li>
+ * </ul>
+ */
+public interface TaskStorageBackupManager {
+
+ /**
+ * Commit operation that is synchronous to processing
+ * @param checkpointId Checkpoint id of the current commit
+ * @return The storename to checkpoint of the snapshotted local store
+ */
+ Map<String, StateCheckpointMarker> snapshot(CheckpointId checkpointId);
+
+ /**
+ * Commit operation that is asynchronous to message processing,
+ * @param checkpointId Checkpoint id of the current commit
+ * @param stateCheckpointMarkers The map of storename to checkpoint makers
returned by the snapshot
+ * @return The future of storename to checkpoint map of the uploaded local
store
+ */
+ CompletableFuture<Map<String, StateCheckpointMarker>> upload(CheckpointId
checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+ /**
+ * Persist the state locally to the file system
+ * @param checkpointId The id of the checkpoint to be committed
+ * @param stateCheckpointMarkers Uploaded storename to checkpoints markers
to be persisted locally
+ */
+ void persistToFilesystem(CheckpointId checkpointId, Map<String,
StateCheckpointMarker> stateCheckpointMarkers);
+
+ /**
+ * Cleanup any local or remote state for obsolete checkpoint information
that are older than checkpointId
+ * @param checkpointId The id of the latest successfully committed checkpoint
+ */
+ void cleanUp(CheckpointId checkpointId);
Review comment:
Need markers here? E.g. to find index blobIds etc.
##########
File path:
samza-api/src/main/java/org/apache/samza/checkpoint/RemoteStoreMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+
+
+public class RemoteStoreMetadata {
Review comment:
implements StateCheckpointMarker?
##########
File path:
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -543,6 +540,10 @@ object SamzaContainer extends Logging {
storeWatchPaths.addAll(containerStorageManager.getStoreDirectoryPaths)
+ val stateStorageBackendFactory = {
Review comment:
Minor: Don't need curly braces.
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/TaskStorageBackupManager.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskStorageBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ * <li>Snapshot will be called before Upload.</li>
+ * <li>persistToFilesystem will be called after Upload is completed</li>
+ * <li>Cleanup is only called after Upload and persistToFilesystem has
successfully completed</li>
+ * </ul>
+ */
+public interface TaskStorageBackupManager {
+
+ /**
+ * Commit operation that is synchronous to processing
+ * @param checkpointId Checkpoint id of the current commit
+ * @return The storename to checkpoint of the snapshotted local store
+ */
+ Map<String, StateCheckpointMarker> snapshot(CheckpointId checkpointId);
+
+ /**
+ * Commit operation that is asynchronous to message processing,
+ * @param checkpointId Checkpoint id of the current commit
+ * @param stateCheckpointMarkers The map of storename to checkpoint makers
returned by the snapshot
+ * @return The future of storename to checkpoint map of the uploaded local
store
+ */
+ CompletableFuture<Map<String, StateCheckpointMarker>> upload(CheckpointId
checkpointId, Map<String, StateCheckpointMarker> stateCheckpointMarkers);
+
+ /**
+ * Persist the state locally to the file system
+ * @param checkpointId The id of the checkpoint to be committed
+ * @param stateCheckpointMarkers Uploaded storename to checkpoints markers
to be persisted locally
+ */
+ void persistToFilesystem(CheckpointId checkpointId, Map<String,
StateCheckpointMarker> stateCheckpointMarkers);
+
+ /**
+ * Cleanup any local or remote state for obsolete checkpoint information
that are older than checkpointId
+ * @param checkpointId The id of the latest successfully committed checkpoint
+ */
+ void cleanUp(CheckpointId checkpointId);
+
+ /**
+ * Used for testing as a shutdown hook to cleanup any allocated resources
+ */
+ void stop();
Review comment:
Should also have a corresponding start method. Prefer init/close for
consistency with most other samza methods.
##########
File path: samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
##########
@@ -38,6 +42,11 @@
*/
public interface StorageEngine {
+ /**
+ * Initiate storage engine
+ */
+ void init(ContainerContext containerContext, JobContext jobContext,
ExternalContext externalContext);
Review comment:
Nitpick: Order of param: External, Job, Container, to follow the
hierarchy. Same for StateBackendFactory method params (JobModel,
ContainerModel, TaskModel)
##########
File path:
samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
##########
@@ -35,6 +38,18 @@
* @param <V> the type of values maintained by this key-value store.
*/
public interface KeyValueStore<K, V> {
+
+ /**
+ * Initiates the KeyValueStore
+ *
+ * @param containerContext context of the KeyValueStore's container
+ * @param jobContext context of the job the KeyValueStore is in
+ * @param externalContext any external store required for initialization
+ */
+ default void init(ContainerContext containerContext, JobContext jobContext,
ExternalContext externalContext) {
+ throw new UnsupportedOperationException("init() is not supported in " +
this.getClass().getName());
Review comment:
Is it better to make this a noop than throwing an exception? That way we
can always call init at the right place in the store lifecycle.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]