eyalzek commented on issue #10406:
URL: https://github.com/apache/airflow/issues/10406#issuecomment-678109841


   For posterity, for anyone deploying to kubernetes and using EFK for logging 
(specifically with https://github.com/fluent/fluentd-kubernetes-daemonset), 
this is the fluentd configuration we're using at the moment for getting 
`log_id` & `offset` into worker log lines:
   
   ```
   <filter kubernetes.var.log.containers.**>
     @type parser
     <parse>
       @type json
     </parse>
     emit_invalid_record_to_error false
     key_name log
     replace_invalid_sequence true
     reserve_data true
     reserve_time true
     remove_key_name_field true
   </filter>
   
   <filter var.log.containers.**>
     @type record_modifier
     prepare_value time = Time.now; @offset = time.to_i * (10 ** 9) + time.nsec
     remove_keys _dummy_
     <record>
       _dummy_ ${if record.has_key?('task_log'); record['log_id'] = 
"#{record['kubernetes']['labels']['dag_id']}-#{record['kubernetes']['labels']['task_id']}-#{record['kubernetes']['labels']['execution_date'].gsub(/_plus.+/,
 '').gsub(/[-\.]/, '_')}-#{record['kubernetes']['labels']['try_number']}"; 
record['offset'] = @offset; end; nil}
     </record>
   </filter>
   ```
   
   in conjunction with the following airflow configuration:
   ```
   AIRFLOW__CORE__REMOTE_LOGGING: "True"
   AIRFLOW__ELASTICSEARCH__HOST: http://elasticsearch:9200
   AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: "True"
   AIRFLOW__ELASTICSEARCH__JSON_FIELDS: asctime, filename, lineno, levelname, 
message, task_log # task_log is used to tell task logs apart from airflow logs 
in fluentd
   AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "True"
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to