This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new edaaec2  Remove register(ssp, startpoint) API from SystemConsumer and 
its usages. (#1022)
edaaec2 is described below

commit edaaec29932d8f5885b015032615cb839f73761f
Author: shanthoosh <[email protected]>
AuthorDate: Wed May 8 15:14:55 2019 -0700

    Remove register(ssp, startpoint) API from SystemConsumer and its usages. 
(#1022)
---
 .../main/java/org/apache/samza/system/SystemConsumer.java | 15 ---------------
 .../java/org/apache/samza/util/BlockingEnvelopeMap.java   |  9 ---------
 .../scala/org/apache/samza/system/SystemConsumers.scala   | 14 +++++---------
 .../org/apache/samza/system/TestSystemConsumers.scala     |  5 -----
 4 files changed, 5 insertions(+), 38 deletions(-)

diff --git 
a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java 
b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
index c89603d..56879fe 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
@@ -22,8 +22,6 @@ package org.apache.samza.system;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.startpoint.Startpoint;
 
 /**
  * <p>
@@ -139,19 +137,6 @@ public interface SystemConsumer {
   void register(SystemStreamPartition systemStreamPartition, String offset);
 
   /**
-   * Registers the {@link Startpoint} to the SystemConsumer. SystemConsumer
-   * should read the messages from all the registered SystemStreamPartitions.
-   * SystemStreamPartitions should be registered before the start is called.
-   *
-   * @param systemStreamPartition represents the SystemStreamPartition to be 
registered.
-   * @param startpoint represents the position in the SystemStreamPartition.
-   */
-  @InterfaceStability.Evolving
-  default void register(SystemStreamPartition systemStreamPartition, 
Startpoint startpoint) {
-    throw new UnsupportedOperationException(String.format("Registering the 
ssp: %s with startpoint: %s is not supported.", systemStreamPartition, 
startpoint));
-  }
-
-  /**
    * Poll the SystemConsumer to get any available messages from the underlying
    * system.
    *
diff --git 
a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java 
b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index e284cef..79f340f 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -33,7 +33,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.startpoint.Startpoint;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemStreamPartition;
@@ -104,14 +103,6 @@ public abstract class BlockingEnvelopeMap implements 
SystemConsumer {
   }
 
   /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
-    initializeInternalStateForSSP(systemStreamPartition);
-  }
-
-  /**
    * Initializes the metrics and in-memory buffer for the {@param 
systemStreamPartition}.
    * @param systemStreamPartition represents the input system stream partition.
    */
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 971ded3..05f423d 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -35,7 +35,6 @@ import org.apache.samza.startpoint.Startpoint
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.SamzaException
 
-
 object SystemConsumers {
   val DEFAULT_POLL_INTERVAL_MS = 50
   val DEFAULT_NO_NEW_MESSAGES_TIMEOUT = 10
@@ -220,14 +219,11 @@ class SystemConsumers (
 
     try {
       val consumer = consumers(systemStreamPartition.getSystem)
-      if (startpoint != null) {
-        consumer.register(systemStreamPartition, startpoint)
-      } else {
-        val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition)
-        val systemAdmin = 
systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
-        if (existingOffset == null || 
systemAdmin.offsetComparator(existingOffset, offset) > 0) {
-          sspToRegisteredOffsets.put(systemStreamPartition, offset)
-        }
+      val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition)
+      val systemAdmin = 
systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
+      val offsetComparisonResult = 
systemAdmin.offsetComparator(existingOffset, offset)
+      if (existingOffset == null || (offsetComparisonResult != null && 
offsetComparisonResult > 0)) {
+        sspToRegisteredOffsets.put(systemStreamPartition, offset)
       }
     } catch {
       case e: NoSuchElementException => throw new 
SystemConsumersException("can't register " + systemStreamPartition.getSystem + 
"'s consumer.", e)
diff --git 
a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 15e2627..875c4a9 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -422,11 +422,6 @@ class TestSystemConsumers {
     override def register(systemStreamPartition: SystemStreamPartition, 
offset: String): Unit = {
        super[BlockingEnvelopeMap].register(systemStreamPartition, offset)
     }
-
-    override def register(systemStreamPartition: SystemStreamPartition, 
startpoint: Startpoint): Unit = {
-      super[BlockingEnvelopeMap].register(systemStreamPartition, startpoint)
-    }
-
   }
 }
 

Reply via email to