shanthoosh commented on a change in pull request #987: SAMZA-2158: Remove the
redunant coordinator stream reads in the ApplicationMaster startup sequence.
URL: https://github.com/apache/samza/pull/987#discussion_r273225617
##########
File path:
samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
##########
@@ -46,38 +49,42 @@
import
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.util.CoordinatorStreamUtil;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * An implementation of the {@link MetadataStore} interface where the metadata
of the Samza job is stored in coordinator stream.
+ * An implementation of the {@link MetadataStore} interface where the metadata
of the samza job is stored in coordinator stream.
*
* This class is thread safe.
+ *
+ * It is recommended to use {@link NamespaceAwareCoordinatorStreamStore}. This
will enable the single CoordinatorStreamStore connection
+ * to be shared by the multiple {@link NamespaceAwareCoordinatorStreamStore}
instances.
*/
public class CoordinatorStreamStore implements MetadataStore {
private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorStreamStore.class);
private static final String SOURCE = "SamzaContainer";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final Config config;
private final SystemStream coordinatorSystemStream;
private final SystemStreamPartition coordinatorSystemStreamPartition;
private final SystemProducer systemProducer;
private final SystemConsumer systemConsumer;
private final SystemAdmin systemAdmin;
- private final String type;
- private final CoordinatorStreamKeySerde keySerde;
- private final Map<String, byte[]> bootstrappedMessages = new HashMap<>();
+ // Namespaced key to the message byte array.
+ private final Map<String, byte[]> bootstrappedMessages = new
ConcurrentHashMap<>();
Review comment:
Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services