Shanthoosh Venkataraman created SAMZA-2167:
----------------------------------------------
Summary: Do not close the metadata store at the end of
ProcessJobFactory .
Key: SAMZA-2167
URL: https://issues.apache.org/jira/browse/SAMZA-2167
Project: Samza
Issue Type: Improvement
Reporter: Shanthoosh Venkataraman
Assignee: Shanthoosh Venkataraman
Currently in ProcessJobFactory the metadata-store connection is closed after
generating the JobModel.
To read from coordinator stream only once in samza-yarn ApplicationMaster, we
ended up making the LocalityManager, TaskAssignmentManager,
ChanglogStreamManager etc. However, after the above change closing the
metadata store in ProcessJobFactory after generating the JobModel, results in
the following exception when the servlet API queries the JobModel:
{code:java}
org.codehaus.jackson.map.JsonMappingException:
samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has
stopped. (through reference chain:
org.apache.samza.job.model.JobModel["all-container-locality"])
at
org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218)
at
org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183)
at
org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140)
at
org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158)
at
org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112)
at
org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610)
at
org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256)
at
org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575)
at
org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081)
at
org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39)
at
org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800)
at
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
at
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
at
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
at
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.Server.handle(Server.java:497)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
at
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245)
at
org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
at
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.samza.SamzaException:
samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has
stopped.
at
org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311)
at
org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87)
at
org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58)
at
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154)
at
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148)
at
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98)
at
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74)
at
org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63)
at
org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123)
at
org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483)
at
org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418)
at
org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150)
... 25 more
2019-04-16 12:16:46.090 [qtp220371218-112] HttpChannel [WARN] /
org.codehaus.jackson.map.JsonMappingException:
samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has
stopped. (through reference chain:
org.apache.samza.job.model.JobModel["all-container-locality"])
at
org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218)
at
org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183)
at
org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140)
at
org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158)
at
org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112)
at
org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610)
at
org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256)
at
org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575)
at
org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081)
at
org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39)
at
org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800)
at
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
at
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
at
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
at
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
at
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
at
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
at
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
at org.eclipse.jetty.server.Server.handle(Server.java:497)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
at
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245)
at
org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
at
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.samza.SamzaException:
samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has
stopped.
at
org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311)
at
org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87)
at
org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58)
at
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154)
at
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148)
at
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98)
at
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74)
at
org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63)
at
org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123)
at
org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483)
at
org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418)
at
org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150)
... 25 more
2019-04-16 12:16:46.091 [qtp220371218-112] HttpChannel [WARN] Could not send
response error 500: org.codehaus.jackson.map.JsonMappingException:
samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has
stopped. (through reference chain:
org.apache.samza.job.model.JobModel["all-container-locality"])
{code}
The above exception causes the local deployment to fail.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)