This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new c49a218 SAMZA-2071: Skip StartpointManager instantiation for
PassThroughCoordinator. (#933)
c49a218 is described below
commit c49a218f7126b6cb45d61551bf94c83fa195f56b
Author: shanthoosh <[email protected]>
AuthorDate: Thu Feb 28 15:24:37 2019 -0800
SAMZA-2071: Skip StartpointManager instantiation for
PassThroughCoordinator. (#933)
---
.../main/scala/org/apache/samza/container/SamzaContainer.scala | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 094f49a..f8d799c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -426,18 +426,20 @@ object SamzaContainer extends Logging {
.orNull
info("Got checkpoint manager: %s" format checkpointManager)
- val metadataStoreFactory = Option(config.getStartpointMetadataStoreFactory)
+ val startpointMetadataStoreFactory =
Option(config.getStartpointMetadataStoreFactory)
.map(Util.getObj(_, classOf[MetadataStoreFactory]))
.orNull
- val startpointManager = {
+ val startpointManager = if (startpointMetadataStoreFactory != null) {
try {
- Option(new StartpointManager(metadataStoreFactory, config,
samzaContainerMetrics.registry))
+ Option(new StartpointManager(startpointMetadataStoreFactory, config,
samzaContainerMetrics.registry))
} catch {
case e: Exception => {
error("Unable to get an instance of the StartpointManager.
Continuing without one.", e)
None
}
}
+ } else {
+ None
}
// create a map of consumers with callbacks to pass to the OffsetManager