This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new edaaec2 Remove register(ssp, startpoint) API from SystemConsumer and
its usages. (#1022)
edaaec2 is described below
commit edaaec29932d8f5885b015032615cb839f73761f
Author: shanthoosh <[email protected]>
AuthorDate: Wed May 8 15:14:55 2019 -0700
Remove register(ssp, startpoint) API from SystemConsumer and its usages.
(#1022)
---
.../main/java/org/apache/samza/system/SystemConsumer.java | 15 ---------------
.../java/org/apache/samza/util/BlockingEnvelopeMap.java | 9 ---------
.../scala/org/apache/samza/system/SystemConsumers.scala | 14 +++++---------
.../org/apache/samza/system/TestSystemConsumers.scala | 5 -----
4 files changed, 5 insertions(+), 38 deletions(-)
diff --git
a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
index c89603d..56879fe 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
@@ -22,8 +22,6 @@ package org.apache.samza.system;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.startpoint.Startpoint;
/**
* <p>
@@ -139,19 +137,6 @@ public interface SystemConsumer {
void register(SystemStreamPartition systemStreamPartition, String offset);
/**
- * Registers the {@link Startpoint} to the SystemConsumer. SystemConsumer
- * should read the messages from all the registered SystemStreamPartitions.
- * SystemStreamPartitions should be registered before the start is called.
- *
- * @param systemStreamPartition represents the SystemStreamPartition to be
registered.
- * @param startpoint represents the position in the SystemStreamPartition.
- */
- @InterfaceStability.Evolving
- default void register(SystemStreamPartition systemStreamPartition,
Startpoint startpoint) {
- throw new UnsupportedOperationException(String.format("Registering the
ssp: %s with startpoint: %s is not supported.", systemStreamPartition,
startpoint));
- }
-
- /**
* Poll the SystemConsumer to get any available messages from the underlying
* system.
*
diff --git
a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index e284cef..79f340f 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -33,7 +33,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.startpoint.Startpoint;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
@@ -104,14 +103,6 @@ public abstract class BlockingEnvelopeMap implements
SystemConsumer {
}
/**
- * {@inheritDoc}
- */
- @Override
- public void register(SystemStreamPartition systemStreamPartition, Startpoint
startpoint) {
- initializeInternalStateForSSP(systemStreamPartition);
- }
-
- /**
* Initializes the metrics and in-memory buffer for the {@param
systemStreamPartition}.
* @param systemStreamPartition represents the input system stream partition.
*/
diff --git
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 971ded3..05f423d 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -35,7 +35,6 @@ import org.apache.samza.startpoint.Startpoint
import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.SamzaException
-
object SystemConsumers {
val DEFAULT_POLL_INTERVAL_MS = 50
val DEFAULT_NO_NEW_MESSAGES_TIMEOUT = 10
@@ -220,14 +219,11 @@ class SystemConsumers (
try {
val consumer = consumers(systemStreamPartition.getSystem)
- if (startpoint != null) {
- consumer.register(systemStreamPartition, startpoint)
- } else {
- val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition)
- val systemAdmin =
systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
- if (existingOffset == null ||
systemAdmin.offsetComparator(existingOffset, offset) > 0) {
- sspToRegisteredOffsets.put(systemStreamPartition, offset)
- }
+ val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition)
+ val systemAdmin =
systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
+ val offsetComparisonResult =
systemAdmin.offsetComparator(existingOffset, offset)
+ if (existingOffset == null || (offsetComparisonResult != null &&
offsetComparisonResult > 0)) {
+ sspToRegisteredOffsets.put(systemStreamPartition, offset)
}
} catch {
case e: NoSuchElementException => throw new
SystemConsumersException("can't register " + systemStreamPartition.getSystem +
"'s consumer.", e)
diff --git
a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 15e2627..875c4a9 100644
---
a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++
b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -422,11 +422,6 @@ class TestSystemConsumers {
override def register(systemStreamPartition: SystemStreamPartition,
offset: String): Unit = {
super[BlockingEnvelopeMap].register(systemStreamPartition, offset)
}
-
- override def register(systemStreamPartition: SystemStreamPartition,
startpoint: Startpoint): Unit = {
- super[BlockingEnvelopeMap].register(systemStreamPartition, startpoint)
- }
-
}
}