cameronlee314 commented on a change in pull request #1027: SAMZA-2046: 
Startpoint fan out implementation
URL: https://github.com/apache/samza/pull/1027#discussion_r284407812
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
 ##########
 @@ -20,87 +20,117 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashSet;
-import java.util.Objects;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * The StartpointManager reads and writes {@link Startpoint} to the {@link 
MetadataStore} defined by
- * the configuration task.startpoint.metadata.store.factory.
- *
- * Startpoints are keyed in the MetadataStore by two different formats:
- * 1) Only by {@link SystemStreamPartition}
- * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ * The StartpointManager reads and writes {@link Startpoint} to the provided 
{@link MetadataStore}
  *
  * The intention for the StartpointManager is to maintain a strong contract 
between the caller
  * and how Startpoints are stored in the underlying MetadataStore.
+ *
+ * Startpoints are written in the MetadataStore using keys of two different 
formats:
+ * 1) {@link SystemStreamPartition} only
+ * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ *
+ * Startpoints are then fanned out to a fan out namespace in the MetadataStore 
by the
+ * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the 
standalone
+ * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the
+ * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set 
the starting offsets per task and per
+ * {@link SystemStreamPartition}. The fan outs are deleted once the offsets 
are committed to the checkpoint.
+ *
+ * The read, write and delete methods are intended for external callers.
+ * The fan out methods are intended to be used within a job coordinator.
  */
 public class StartpointManager {
-  private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
-  public static final String NAMESPACE = "samza-startpoint-v1";
+  private static final Integer VERSION = 1;
+  public static final String NAMESPACE = "samza-startpoint-v" + VERSION;
 
   static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12);
 
-  private final MetadataStore metadataStore;
-  private final StartpointSerde startpointSerde = new StartpointSerde();
-
-  private boolean stopped = false;
+  private static final Logger LOG = 
LoggerFactory.getLogger(StartpointManager.class);
+  private static final String NAMESPACE_FAN_OUT = NAMESPACE + "-fan-out";
 
-  /**
-   *  Constructs a {@link StartpointManager} instance by instantiating a new 
metadata store connection.
-   *  This is primarily used for testing.
-   */
-  @VisibleForTesting
-  StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, 
MetricsRegistry metricsRegistry) {
-    Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory 
cannot be null");
-    Preconditions.checkNotNull(config, "Config cannot be null");
-    Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be 
null");
+  private final MetadataStore metadataStore;
+  private final NamespaceAwareCoordinatorStreamStore fanOutStore;
+  private final NamespaceAwareCoordinatorStreamStore readWriteStore;
+  private final ObjectMapper objectMapper = 
StartpointObjectMapper.getObjectMapper();
 
-    this.metadataStore = metadataStoreFactory.getMetadataStore(NAMESPACE, 
config, metricsRegistry);
-    LOG.info("StartpointManager created with metadata store: {}", 
metadataStore.getClass().getCanonicalName());
-    this.metadataStore.init();
-  }
+  private boolean stopped = true;
 
   /**
    *  Builds the StartpointManager based upon the provided {@link 
MetadataStore} that is instantiated.
    *  Setting up a metadata store instance is expensive which requires opening 
multiple connections
-   *  and reading tons of information. Fully instantiated metadata store is 
taken as a constructor argument
+   *  and reading tons of information. Fully instantiated metadata store is 
passed in as a constructor argument
    *  to reuse it across different utility classes.
    *
    * @param metadataStore an instance of {@link MetadataStore} used to 
read/write the start-points.
    */
   public StartpointManager(MetadataStore metadataStore) {
     Preconditions.checkNotNull(metadataStore, "MetadataStore cannot be null");
-    this.metadataStore = new 
NamespaceAwareCoordinatorStreamStore(metadataStore, NAMESPACE);
+
+    this.metadataStore = metadataStore;
+    this.readWriteStore = new 
NamespaceAwareCoordinatorStreamStore(metadataStore, NAMESPACE);
+    this.fanOutStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, 
NAMESPACE_FAN_OUT);
+    LOG.info("Lifecycle of metadata store: {} is managed externally", 
metadataStore.getClass().getCanonicalName());
 
 Review comment:
   Minor: unnecessary log?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to