[ 
https://issues.apache.org/jira/browse/FLINK-2292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14627807#comment-14627807
 ] 

ASF GitHub Bot commented on FLINK-2292:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/896#discussion_r34662347
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
 ---
    @@ -0,0 +1,144 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.accumulators;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.accumulators.Accumulator;
    +import org.apache.flink.api.common.accumulators.LongCounter;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +
    +/**
    + * Main accumulator registry which encapsulates internal and user-defined 
accumulators.
    + */
    +public class AccumulatorRegistry {
    +
    +   protected static final Logger LOG = 
LoggerFactory.getLogger(AccumulatorRegistry.class);
    +
    +   protected final JobID jobID;
    +   protected final ExecutionAttemptID taskID;
    +
    +   /* Flink's internal Accumulator values stored for the executing task. */
    +   private final Map<Metric, Accumulator<?, ?>> flinkAccumulators =
    +                   new HashMap<Metric, Accumulator<?, ?>>();
    +
    +   /* User-defined Accumulator values stored for the executing task. */
    +   private final Map<String, Accumulator<?, ?>> userAccumulators =
    +                   Collections.synchronizedMap(new HashMap<String, 
Accumulator<?, ?>>());
    +
    +   /* The reporter returned to reporting tasks. */
    +   private final ReadWriteReporter reporter;
    +
    +   /**
    +    * Flink metrics supported
    +    */
    +   public enum Metric {
    +           NUM_RECORDS_IN,
    +           NUM_RECORDS_OUT,
    +           NUM_BYTES_IN,
    +           NUM_BYTES_OUT
    +   }
    +
    +
    +   public AccumulatorRegistry(JobID jobID, ExecutionAttemptID taskID) {
    +           this.jobID = jobID;
    +           this.taskID = taskID;
    +           this.reporter = new ReadWriteReporter(flinkAccumulators);
    +   }
    +
    +   /**
    +    * Creates a snapshot of this accumulator registry.
    +    * @return a serialized accumulator map
    +    */
    +   public AccumulatorSnapshot getSnapshot() {
    +           try {
    +                   return new AccumulatorSnapshot(jobID, taskID, 
flinkAccumulators, userAccumulators);
    +           } catch (IOException e) {
    +                   LOG.warn("Failed to serialize accumulators for task.", 
e);
    +                   return null;
    +           }
    +   }
    +
    +   /**
    +    * Gets the map for user-defined accumulators.
    +    */
    +   public Map<String, Accumulator<?, ?>> getUserMap() {
    +           return userAccumulators;
    +   }
    +
    +   /**
    +    * Gets the reporter for flink internal metrics.
    +    */
    +   public Reporter getReadWriteReporter() {
    +           return reporter;
    +   }
    +
    +   /**
    +    * Interface for Flink's internal accumulators.
    +    */
    +   public interface Reporter {
    +           void report(Metric metric, long value);
    +   }
    +
    +   /**
    +    * Accumulator based reporter for keeping track of internal metrics 
(e.g. bytes and records in/out)
    +    */
    +   public static class ReadWriteReporter implements Reporter {
    +
    +           private LongCounter numRecordsIn = new LongCounter();
    +           private LongCounter numRecordsOut = new LongCounter();
    +           private LongCounter numBytesIn = new LongCounter();
    +           private LongCounter numBytesOut = new LongCounter();
    +
    +           public ReadWriteReporter(Map<Metric, Accumulator<?,?>> 
accumulatorMap) {
    +                   accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn);
    +                   accumulatorMap.put(Metric.NUM_RECORDS_OUT, 
numRecordsOut);
    +                   accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn);
    +                   accumulatorMap.put(Metric.NUM_BYTES_OUT, numBytesOut);
    +           }
    +
    +           @Override
    +           public void report(Metric metric, long value) {
    --- End diff --
    
    This is called extremely often - once per tuple. It needs to have virtually 
zero overhead.
    
    To achieve that, I would add dedicated methods for each metric, rather than 
have the enumeration.


> Report accumulators periodically while job is running
> -----------------------------------------------------
>
>                 Key: FLINK-2292
>                 URL: https://issues.apache.org/jira/browse/FLINK-2292
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager, TaskManager
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>             Fix For: 0.10
>
>
> Accumulators should be sent periodically, as part of the heartbeat that sends 
> metrics. This allows them to be updated in real time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to