bkonold commented on a change in pull request #1403:
URL: https://github.com/apache/samza/pull/1403#discussion_r463404039
##########
File path:
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -305,38 +318,58 @@ protected Config getConfig() {
return config;
}
+ protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
+ return new Log4jSystemConfig(config);
+ }
+
+ protected StreamAppenderMetrics getMetrics(MetricsRegistryMap
metricsRegistry) {
+ return new StreamAppenderMetrics(appenderName, metricsRegistry);
+ }
+
+ protected void setupStream(SystemFactory systemFactory, String systemName) {
+ if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
+ // Explicitly create stream appender stream with the partition count the
same as the number of containers.
+ System.out.println(String.format("[%s] creating stream ", appenderName)
+ streamName + " with partition count " + getPartitionCount());
+ StreamSpec streamSpec =
+ StreamSpec.createStreamAppenderStreamSpec(streamName, systemName,
getPartitionCount());
+
+ // SystemAdmin only needed for stream creation here.
+ SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+ systemAdmin.start();
+ systemAdmin.createStream(streamSpec);
+ systemAdmin.stop();
+ }
+ }
+
protected void setupSystem() {
config = getConfig();
- Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+ Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(config);
if (streamName == null) {
streamName = getStreamName(log4jSystemConfig.getJobName(),
log4jSystemConfig.getJobId());
}
- // TODO we need the ACTUAL metrics registry, or the metrics won't get
reported by the metric reporters!
- MetricsRegistry metricsRegistry = new MetricsRegistryMap();
- metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
+ // Instantiate metrics
+ MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();
Review comment:
We're using the same registry; does the comment from before still apply?
##########
File path:
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
##########
@@ -34,10 +34,22 @@
/** The number of log messages dropped e.g. because of buffer overflow. Does
not include recursive calls. */
public final Counter logMessagesDropped;
+ /** The size of log messages sent out to SystemProducer. */
+ public final Counter logMessagesBytes;
+
+ /** The number of log messages sent out to SystemProducer. */
+ public final Counter logMessagesCount;
+
+ /** The number of log messages cannot be sent out due to errors e.g.
serialization errors, system producer send errors. */
+ public final Counter logMessagesErrors;
+
public StreamAppenderMetrics(String prefix, MetricsRegistry registry) {
- super(prefix, registry);
+ super(prefix + "-", registry);
bufferFillPct = newGauge("buffer-fill-percent", 0);
recursiveCalls = newCounter("recursive-calls");
logMessagesDropped = newCounter("log-messages-dropped");
+ logMessagesBytes = newCounter("log-messages-bytes");
+ logMessagesCount = newCounter("log-messages-count");
Review comment:
log-messages-sent?
##########
File path:
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -84,15 +87,17 @@
protected static volatile boolean systemInitialized = false;
- private Config config = null;
+ protected Config config = null;
private SystemStream systemStream = null;
private SystemProducer systemProducer = null;
private String key = null;
- private String streamName = null;
+ protected String streamName = null;
+ protected String appenderName = null;
+ private String containerName = null;
private int partitionCount = 0;
private boolean isApplicationMaster;
private Serde<LogEvent> serde = null;
- private Logger log = LogManager.getLogger(StreamAppender.class);
+ protected Logger log = LogManager.getLogger(StreamAppender.class);
protected StreamAppenderMetrics metrics;
Review comment:
can we organize by modifier for readability? to see everything in one
place that might be modified by a subclass
##########
File path:
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -305,38 +318,58 @@ protected Config getConfig() {
return config;
}
+ protected Log4jSystemConfig getLog4jSystemConfig(Config config) {
+ return new Log4jSystemConfig(config);
+ }
+
+ protected StreamAppenderMetrics getMetrics(MetricsRegistryMap
metricsRegistry) {
+ return new StreamAppenderMetrics(appenderName, metricsRegistry);
+ }
+
+ protected void setupStream(SystemFactory systemFactory, String systemName) {
+ if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
+ // Explicitly create stream appender stream with the partition count the
same as the number of containers.
+ System.out.println(String.format("[%s] creating stream ", appenderName)
+ streamName + " with partition count " + getPartitionCount());
+ StreamSpec streamSpec =
+ StreamSpec.createStreamAppenderStreamSpec(streamName, systemName,
getPartitionCount());
+
+ // SystemAdmin only needed for stream creation here.
+ SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+ systemAdmin.start();
+ systemAdmin.createStream(streamSpec);
+ systemAdmin.stop();
+ }
+ }
+
protected void setupSystem() {
config = getConfig();
- Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+ Log4jSystemConfig log4jSystemConfig = getLog4jSystemConfig(config);
if (streamName == null) {
streamName = getStreamName(log4jSystemConfig.getJobName(),
log4jSystemConfig.getJobId());
}
- // TODO we need the ACTUAL metrics registry, or the metrics won't get
reported by the metric reporters!
- MetricsRegistry metricsRegistry = new MetricsRegistryMap();
- metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
+ // Instantiate metrics
+ MetricsRegistryMap metricsRegistry = new MetricsRegistryMap();
+ // Take this.getClass().getName() as the name to make it extend-friendly
+ metrics = getMetrics(metricsRegistry);
+ // Register metrics into metrics reporters so that they are able to be
reported to other systems: e.g. inGraphs
Review comment:
let's keep mention of things like "inGraphs" out of OSS
##########
File path:
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
##########
@@ -34,10 +34,22 @@
/** The number of log messages dropped e.g. because of buffer overflow. Does
not include recursive calls. */
public final Counter logMessagesDropped;
+ /** The size of log messages sent out to SystemProducer. */
+ public final Counter logMessagesBytes;
+
+ /** The number of log messages sent out to SystemProducer. */
+ public final Counter logMessagesCount;
+
+ /** The number of log messages cannot be sent out due to errors e.g.
serialization errors, system producer send errors. */
+ public final Counter logMessagesErrors;
+
public StreamAppenderMetrics(String prefix, MetricsRegistry registry) {
- super(prefix, registry);
+ super(prefix + "-", registry);
bufferFillPct = newGauge("buffer-fill-percent", 0);
recursiveCalls = newCounter("recursive-calls");
logMessagesDropped = newCounter("log-messages-dropped");
+ logMessagesBytes = newCounter("log-messages-bytes");
Review comment:
log-messages-bytes-sent?
##########
File path:
samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
##########
@@ -84,15 +87,17 @@
protected static volatile boolean systemInitialized = false;
- private Config config = null;
+ protected Config config = null;
private SystemStream systemStream = null;
private SystemProducer systemProducer = null;
private String key = null;
- private String streamName = null;
+ protected String streamName = null;
+ protected String appenderName = null;
+ private String containerName = null;
private int partitionCount = 0;
private boolean isApplicationMaster;
private Serde<LogEvent> serde = null;
- private Logger log = LogManager.getLogger(StreamAppender.class);
+ protected Logger log = LogManager.getLogger(StreamAppender.class);
Review comment:
why does this need to be protected? can't subclasses have their own
logger?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]