[
https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313362#comment-16313362
]
Jihyun Cho edited comment on FLINK-8318 at 1/5/18 4:14 PM:
-----------------------------------------------------------
Here is my code and pom files.
{code:title=Test.scala}
import java.net.{InetAddress, InetSocketAddress}
import java.util.{Properties, TimeZone}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import
org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction,
RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
object Test {
val consumeProperties = {
val props = new Properties()
props.setProperty("bootstrap.servers", "kafka-001:9092")
props.setProperty("group.id", "test")
props.setProperty("auto.offset.reset", "latest")
props
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new
SimpleStringSchema(), consumeProperties))
val config = new java.util.HashMap[String, String]
config.put("cluster.name", "test")
val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("es-001"), 9300))
val esSink = new ElasticsearchSink[String](config, transportAddresses,
new ElasticsearchSinkFunction[String] {
def createIndexRequest(t: String): IndexRequest = {
return Requests.indexRequest()
.index("test")
.`type`("message")
.source(t)
}
override def process(t: String, runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer) = {
requestIndexer.add(createIndexRequest(t))
}
}
)
stream.map { value =>
try {
val esDateFormat = FastDateFormat.getInstance("yyyy-MM-dd
HH:mm:ss.SSS", TimeZone.getTimeZone("UTC"))
implicit val formats = DefaultFormats
val json = parse(value)
val transJson = json transformField {
case JField("short_message", JString(s)) => ("message", JString(s))
case JField("host", JString(s)) => ("source", JString(s))
case JField("timestamp", JInt(i)) => ("timestamp",
JString(esDateFormat.format((i * 1000L).toLong)))
case JField("timestamp", JDouble(d)) => ("timestamp",
JString(esDateFormat.format((d * 1000L).toLong)))
case JField(k, v) => (k.stripPrefix("_"), v)
}
write(transJson)
} catch {
case _: Exception => ""
}
}.filter(_.nonEmpty).addSink(esSink)
env.execute("Test")
}
}
{code}
{code:title=pom.xml}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test.streaming</groupId>
<artifactId>test</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.4.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_2.11</artifactId>
<version>3.5.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>test.streaming.Test</mainClass>
</transformer>
</transformers>
<finalName>${project.artifactId}</finalName>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
{code}
was (Author: jihyun cho):
Here is my code and pom files.
{code:title=Test.scala}
import java.net.{InetAddress, InetSocketAddress}
import java.util.{Properties, TimeZone}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import
org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction,
RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
object Test {
val consumeProperties = {
val props = new Properties()
props.setProperty("bootstrap.servers", "kafka-001:9092")
props.setProperty("group.id", "test")
props.setProperty("auto.offset.reset", "latest")
props
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new
FlinkKafkaConsumer010[String]("clova-log-dev", new SimpleStringSchema(),
consumeProperties))
val config = new java.util.HashMap[String, String]
config.put("cluster.name", "test")
val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("es-001"), 9300))
val esSink = new ElasticsearchSink[String](config, transportAddresses,
new ElasticsearchSinkFunction[String] {
def createIndexRequest(t: String): IndexRequest = {
return Requests.indexRequest()
.index("test")
.`type`("message")
.source(t)
}
override def process(t: String, runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer) = {
requestIndexer.add(createIndexRequest(t))
}
}
)
stream.map { value =>
try {
val esDateFormat = FastDateFormat.getInstance("yyyy-MM-dd
HH:mm:ss.SSS", TimeZone.getTimeZone("UTC"))
implicit val formats = DefaultFormats
val json = parse(value)
val transJson = json transformField {
case JField("short_message", JString(s)) => ("message", JString(s))
case JField("host", JString(s)) => ("source", JString(s))
case JField("timestamp", JInt(i)) => ("timestamp",
JString(esDateFormat.format((i * 1000L).toLong)))
case JField("timestamp", JDouble(d)) => ("timestamp",
JString(esDateFormat.format((d * 1000L).toLong)))
case JField(k, v) => (k.stripPrefix("_"), v)
}
write(transJson)
} catch {
case _: Exception => ""
}
}.filter(_.nonEmpty).addSink(esSink)
env.execute("Test")
}
}
{code}
{code:title=pom.xml}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test.streaming</groupId>
<artifactId>test</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.4.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-contrib_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_2.11</artifactId>
<version>3.5.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>test.streaming.Test</mainClass>
</transformer>
</transformers>
<finalName>${project.artifactId}</finalName>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
{code}
> Conflict jackson library with ElasticSearch connector
> -----------------------------------------------------
>
> Key: FLINK-8318
> URL: https://issues.apache.org/jira/browse/FLINK-8318
> Project: Flink
> Issue Type: Bug
> Components: ElasticSearch Connector, Startup Shell Scripts
> Affects Versions: 1.4.0
> Reporter: Jihyun Cho
> Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> My flink job is failed after update flink version to 1.4.0. It uses
> ElasticSearch connector.
> I'm using CDH Hadoop with Flink option "classloader.resolve-order:
> parent-first"
> The failure log is below.
> {noformat}
> Using the result of 'hadoop classpath' to augment the Hadoop classpath:
> /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//*
> 2017-12-26 14:13:21,160 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> --------------------------------------------------------------------------------
> 2017-12-26 14:13:21,161 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Starting
> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
> 2017-12-26 14:13:21,161 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - OS current
> user: www
> 2017-12-26 14:13:21,446 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Current
> Hadoop/Kerberos user: www
> 2017-12-26 14:13:21,446 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - JVM: Java
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
> 2017-12-26 14:13:21,447 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap
> size: 31403 MiBytes
> 2017-12-26 14:13:21,447 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME:
> (not set)
> 2017-12-26 14:13:21,448 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Hadoop
> version: 2.6.5
> 2017-12-26 14:13:21,448 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - JVM Options:
> 2017-12-26 14:13:21,448 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - -Xms32768M
> 2017-12-26 14:13:21,448 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - -Xmx32768M
> 2017-12-26 14:13:21,448 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> -XX:MaxDirectMemorySize=8388607T
> 2017-12-26 14:13:21,448 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/
> 2017-12-26 14:13:21,449 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties
> 2017-12-26 14:13:21,449 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml
> 2017-12-26 14:13:21,449 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Program
> Arguments:
> 2017-12-26 14:13:21,449 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> --configDir
> 2017-12-26 14:13:21,449 INFO
> org.apache.flink.runtime.taskmanager.TaskManager -
> /home/www/service/flink-1.4.0/conf
> 2017-12-26 14:13:21,449 INFO
> org.apache.flink.runtime.taskmanager.TaskManager - Classpath:
> ...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:...
> ....
> 2017-12-26 14:14:01,393 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: Custom Source -> Filter -> Map -> Filter -> Sink:
> Unnamed (3/10) (fb33a6e0c1a7e859eaef9cf8bcf4565e) switched from RUNNING to
> FAILED.
> java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
> at
> org.elasticsearch.common.xcontent.json.JsonXContent.<clinit>(JsonXContent.java:76)
> at
> org.elasticsearch.common.xcontent.XContentType$1.xContent(XContentType.java:59)
> at
> org.elasticsearch.common.settings.Setting.arrayToParsableString(Setting.java:726)
> at
> org.elasticsearch.common.settings.Setting.lambda$listSetting$26(Setting.java:672)
> at
> org.elasticsearch.common.settings.Setting$2.getRaw(Setting.java:676)
> at
> org.elasticsearch.common.settings.Setting.lambda$listSetting$24(Setting.java:660)
> at
> org.elasticsearch.common.settings.Setting.listSetting(Setting.java:665)
> at
> org.elasticsearch.common.settings.Setting.listSetting(Setting.java:660)
> at
> org.elasticsearch.common.network.NetworkService.<clinit>(NetworkService.java:50)
> at
> org.elasticsearch.client.transport.TransportClient.newPluginService(TransportClient.java:91)
> at
> org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:119)
> at
> org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:247)
> at
> org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:125)
> at
> org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:111)
> at
> org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:101)
> at
> org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:73)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:281)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> 2017-12-26 14:14:01,393 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: Custom Source -> Filter -> Map -> Filter -> Sink:
> Unnamed (8/10) (e12caa9cc12027738e2426d3a3641bba) switched from RUNNING to
> FAILED.
> java.lang.NoClassDefFoundError: Could not initialize class
> org.elasticsearch.common.network.NetworkService
> at
> org.elasticsearch.client.transport.TransportClient.newPluginService(TransportClient.java:91)
> at
> org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:119)
> at
> org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:247)
> at
> org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:125)
> at
> org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:111)
> at
> org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:101)
> at
> org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:73)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:281)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> {noformat}
> The symbol "FAIL_ON_SYMBOL_HASH_OVERFLOW" has been added in 2.4. But CDH
> Hadoop is using jackson version 2.2. So there is a conflict between the two
> versions.
> I reverted changes of https://issues.apache.org/jira/browse/FLINK-7477, and
> the problem is disappeared.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)