eyalzek commented on issue #10406: URL: https://github.com/apache/airflow/issues/10406#issuecomment-678109841
For posterity, for anyone deploying to kubernetes and using EFK for logging (specifically with https://github.com/fluent/fluentd-kubernetes-daemonset), this is the fluentd configuration we're using at the moment for getting `log_id` & `offset` into worker log lines: ``` <filter kubernetes.var.log.containers.**> @type parser <parse> @type json </parse> emit_invalid_record_to_error false key_name log replace_invalid_sequence true reserve_data true reserve_time true remove_key_name_field true </filter> <filter var.log.containers.**> @type record_modifier prepare_value time = Time.now; @offset = time.to_i * (10 ** 9) + time.nsec remove_keys _dummy_ <record> _dummy_ ${if record.has_key?('task_log'); record['log_id'] = "#{record['kubernetes']['labels']['dag_id']}-#{record['kubernetes']['labels']['task_id']}-#{record['kubernetes']['labels']['execution_date'].gsub(/_plus.+/, '').gsub(/[-\.]/, '_')}-#{record['kubernetes']['labels']['try_number']}"; record['offset'] = @offset; end; nil} </record> </filter> ``` in conjunction with the following airflow configuration: ``` AIRFLOW__CORE__REMOTE_LOGGING: "True" AIRFLOW__ELASTICSEARCH__HOST: http://elasticsearch:9200 AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: "True" AIRFLOW__ELASTICSEARCH__JSON_FIELDS: asctime, filename, lineno, levelname, message, task_log # task_log is used to tell task logs apart from airflow logs in fluentd AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "True" ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
