cameronlee314 commented on a change in pull request #1027: SAMZA-2046:
Startpoint fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r283482010
##########
File path:
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
##########
@@ -43,64 +45,115 @@
/**
- * The StartpointManager reads and writes {@link Startpoint} to the {@link
MetadataStore} defined by
- * the configuration task.startpoint.metadata.store.factory.
- *
- * Startpoints are keyed in the MetadataStore by two different formats:
- * 1) Only by {@link SystemStreamPartition}
- * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ * The StartpointManager reads and writes {@link Startpoint} to the provided
{@link MetadataStore}
*
* The intention for the StartpointManager is to maintain a strong contract
between the caller
* and how Startpoints are stored in the underlying MetadataStore.
+ *
+ * Startpoints are written in the MetadataStore using keys of two different
formats:
+ * 1) {@link SystemStreamPartition} only
+ * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ *
+ * Startpoints are then fanned out to a fan out namespace in the MetadataStore
by the
+ * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the
standalone
+ * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the
+ * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set
the starting offsets per task and per
+ * {@link SystemStreamPartition}. The fan outs are deleted once the offsets
are committed to the checkpoint.
+ *
+ * The read, write and delete methods are intended for external callers.
+ * The fan out methods are intended to be used within a job coordinator..
*/
public class StartpointManager {
- private static final Logger LOG =
LoggerFactory.getLogger(StartpointManager.class);
- public static final String NAMESPACE = "samza-startpoint-v1";
+ public static final Integer VERSION = 1;
+ public static final String NAMESPACE = "samza-startpoint-v" + VERSION;
static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12);
+ private static final Logger LOG =
LoggerFactory.getLogger(StartpointManager.class);
+ private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out";
+
+ private final boolean manageMetadataStoreLifecyle;
private final MetadataStore metadataStore;
+ private final NamespaceAwareCoordinatorStreamStore fanOutStore;
+ private final NamespaceAwareCoordinatorStreamStore readWriteStore;
private final StartpointSerde startpointSerde = new StartpointSerde();
- private boolean stopped = false;
+ private boolean stopped = true;
/**
* Constructs a {@link StartpointManager} instance by instantiating a new
metadata store connection.
- * This is primarily used for testing.
+ * This should only be used for testing.
*/
@VisibleForTesting
StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config,
MetricsRegistry metricsRegistry) {
Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory
cannot be null");
Preconditions.checkNotNull(config, "Config cannot be null");
Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be
null");
+ this.manageMetadataStoreLifecyle = true;
Review comment:
Could you move the `getMetadataStore` call out to
`StartpointManagerTestUtil`, and then just use the regular constructor in
`StartpointManagerTestUtil`, so you can get rid of this constructor and any
metadata store lifecycle logic?
It will be less production code to maintain. Usually, a test constructor is
used for directly injecting objects instead of a utility for helping to build
objects for a test. Instead, that utility could be a test-only class (i.e.
`StartpointManagerTestUtil`).
Also, it looks like `manageMetadataStoreLifecycle` is only used in the test
case, so it would be nice to not need to put that into production code. In my
opinion, tests that depend on `StartpointManager` should just mock it so they
shouldn't need to manage the lifecycle. If you want to use a real
`StartpointManager` in tests, then they need to manage the `StartpointManager`
lifecycle already, so they could also just manage the `MetadataStore` lifecycle
too
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services