Shanthoosh Venkataraman created SAMZA-2167:
----------------------------------------------

             Summary: Do not close the metadata store at the end of 
ProcessJobFactory .
                 Key: SAMZA-2167
                 URL: https://issues.apache.org/jira/browse/SAMZA-2167
             Project: Samza
          Issue Type: Improvement
            Reporter: Shanthoosh Venkataraman
            Assignee: Shanthoosh Venkataraman




Currently in ProcessJobFactory the metadata-store connection is closed after 
generating the JobModel. 

To read from coordinator stream only once in samza-yarn ApplicationMaster, we 
ended up making the LocalityManager, TaskAssignmentManager, 
ChanglogStreamManager etc.  However, after the above change closing the 
metadata store in ProcessJobFactory after generating the JobModel, results in 
the following exception when the servlet API queries the JobModel:


{code:java}
org.codehaus.jackson.map.JsonMappingException: 
samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has 
stopped. (through reference chain: 
org.apache.samza.job.model.JobModel["all-container-locality"])
        at 
org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218)
        at 
org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183)
        at 
org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140)
        at 
org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158)
        at 
org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112)
        at 
org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610)
        at 
org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256)
        at 
org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575)
        at 
org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081)
        at 
org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39)
        at 
org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
        at 
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800)
        at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
        at 
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
        at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
        at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
        at 
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
        at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
        at org.eclipse.jetty.server.Server.handle(Server.java:497)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
        at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245)
        at 
org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.samza.SamzaException: 
samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has 
stopped.
        at 
org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311)
        at 
org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87)
        at 
org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58)
        at 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154)
        at 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148)
        at 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98)
        at 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74)
        at 
org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63)
        at 
org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123)
        at 
org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at 
org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483)
        at 
org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418)
        at 
org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150)
        ... 25 more
2019-04-16 12:16:46.090 [qtp220371218-112] HttpChannel [WARN] /
org.codehaus.jackson.map.JsonMappingException: 
samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has 
stopped. (through reference chain: 
org.apache.samza.job.model.JobModel["all-container-locality"])
        at 
org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218)
        at 
org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183)
        at 
org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140)
        at 
org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158)
        at 
org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112)
        at 
org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610)
        at 
org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256)
        at 
org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575)
        at 
org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081)
        at 
org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39)
        at 
org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
        at 
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800)
        at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
        at 
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
        at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
        at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
        at 
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
        at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
        at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
        at org.eclipse.jetty.server.Server.handle(Server.java:497)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
        at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245)
        at 
org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.samza.SamzaException: 
samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has 
stopped.
        at 
org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311)
        at 
org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87)
        at 
org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58)
        at 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154)
        at 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148)
        at 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98)
        at 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74)
        at 
org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63)
        at 
org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123)
        at 
org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at 
org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483)
        at 
org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418)
        at 
org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150)
        ... 25 more
2019-04-16 12:16:46.091 [qtp220371218-112] HttpChannel [WARN] Could not send 
response error 500: org.codehaus.jackson.map.JsonMappingException: 
samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy has 
stopped. (through reference chain: 
org.apache.samza.job.model.JobModel["all-container-locality"])

{code}


The above exception causes the local deployment to fail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to