[
https://issues.apache.org/jira/browse/SAMZA-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shanthoosh Venkataraman resolved SAMZA-2167.
--------------------------------------------
Resolution: Fixed
> Should not close the MetadataStore after generating JobModel in
> ProcessJobFactory.
> ----------------------------------------------------------------------------------
>
> Key: SAMZA-2167
> URL: https://issues.apache.org/jira/browse/SAMZA-2167
> Project: Samza
> Issue Type: Improvement
> Reporter: Shanthoosh Venkataraman
> Assignee: Shanthoosh Venkataraman
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Currently in ProcessJobFactory the metadata-store connection is closed after
> generating the JobModel.
> To read from coordinator stream only once in samza-yarn ApplicationMaster, we
> ended up making the LocalityManager, TaskAssignmentManager,
> ChanglogStreamManager etc. However, after the above change closing the
> metadata store in ProcessJobFactory after generating the JobModel, results in
> the following exception when the servlet API is hit for querying the JobModel:
> {code:java}
> org.codehaus.jackson.map.JsonMappingException:
> samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy
> has stopped. (through reference chain:
> org.apache.samza.job.model.JobModel["all-container-locality"])
> at
> org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218)
> at
> org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183)
> at
> org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140)
> at
> org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158)
> at
> org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112)
> at
> org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610)
> at
> org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256)
> at
> org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575)
> at
> org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081)
> at
> org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39)
> at
> org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
> at
> org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800)
> at
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
> at
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
> at
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
> at
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
> at
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
> at
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
> at
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
> at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
> at org.eclipse.jetty.server.Server.handle(Server.java:497)
> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
> at
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245)
> at
> org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.samza.SamzaException:
> samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy
> has stopped.
> at
> org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311)
> at
> org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87)
> at
> org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58)
> at
> org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154)
> at
> org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148)
> at
> org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98)
> at
> org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74)
> at
> org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63)
> at
> org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123)
> at
> org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at
> org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483)
> at
> org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418)
> at
> org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150)
> ... 25 more
> 2019-04-16 12:16:46.090 [qtp220371218-112] HttpChannel [WARN] /
> org.codehaus.jackson.map.JsonMappingException:
> samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy
> has stopped. (through reference chain:
> org.apache.samza.job.model.JobModel["all-container-locality"])
> at
> org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218)
> at
> org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183)
> at
> org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140)
> at
> org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158)
> at
> org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112)
> at
> org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610)
> at
> org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256)
> at
> org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575)
> at
> org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081)
> at
> org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39)
> at
> org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
> at
> org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800)
> at
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
> at
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
> at
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
> at
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
> at
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
> at
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
> at
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
> at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
> at org.eclipse.jetty.server.Server.handle(Server.java:497)
> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
> at
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245)
> at
> org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.samza.SamzaException:
> samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy
> has stopped.
> at
> org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311)
> at
> org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87)
> at
> org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58)
> at
> org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154)
> at
> org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148)
> at
> org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98)
> at
> org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74)
> at
> org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63)
> at
> org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123)
> at
> org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at
> org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483)
> at
> org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418)
> at
> org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150)
> ... 25 more
> 2019-04-16 12:16:46.091 [qtp220371218-112] HttpChannel [WARN] Could not send
> response error 500: org.codehaus.jackson.map.JsonMappingException:
> samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy
> has stopped. (through reference chain:
> org.apache.samza.job.model.JobModel["all-container-locality"])
> {code}
> The above exception causes the local deployment to fail.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)