ibuda commented on issue #3728:
URL: https://github.com/apache/hudi/issues/3728#issuecomment-929883637


   Thank you @danny0405 for the link.
   
   I am trying to set up an AWS Kinesis Application project, and as a start, I 
used the code provided by you in the link.
   Although I used the code below,  Kinesis complains about the hudi connector, 
error is supplied below. Is it because the Flink version conflict? Kinesis uses 
Apache Flink 1.11, and a follow-up question is whether because of this version 
conflict, I wont be able to use Hudi on AWS within Kinesis Application? 
   
   Kinesis Application Error log:
   ```
   
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
   
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
   
com.amazonaws.services.kinesisanalytics.S3StreamingSinkJob.execInsertSql(S3StreamingSinkJob.java:63)
   
com.amazonaws.services.kinesisanalytics.S3StreamingSinkJob.main(S3StreamingSinkJob.java:59)
   java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 
   
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   java.base/java.lang.reflect.Method.invoke(Method.java:566)
    
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:294)
   ... 12 more\nCaused by: org.apache.flink.table.api.ValidationException: 
Cannot discover a connector using option ''connector'='hudi''.
   ```
   
   Here is the code I use (you will find below the pom.xml file as well):
   ```
   static TableEnvironment streamTableEnv;
       public static void main(String[] args) throws Exception {
           EnvironmentSettings settings = 
EnvironmentSettings.newInstance().build();
           streamTableEnv = TableEnvironmentImpl.create(settings);
           streamTableEnv.getConfig().getConfiguration()
                   
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
           
streamTableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
 "2s");
           String hoodieTableDDL = "create table t1(\n"
                   + "  uuid varchar(20),\n"
                   + "  name varchar(10),\n"
                   + "  age int,\n"
                   + "  `partition` varchar(20),\n" // test streaming read with 
partition field in the middle
                   + "  ts timestamp(3),\n"
                   + "  PRIMARY KEY(uuid) NOT ENFORCED\n"
                   + ")\n"
                   + "PARTITIONED BY (`partition`)\n"
                   + "with (\n"
                   + "  'connector' = 'hudi',\n"
                   + "  'path' = 's3://bucket-name/hudi/sql-api/',\n"
                   + "  'read.streaming.enabled' = 'true'\n"
                   + ")";
   
           streamTableEnv.executeSql(hoodieTableDDL);
           String insertInto = "insert into t1 values\n"
                   + "('id1','Danny',23,'par1',TIMESTAMP '1970-01-01 
00:00:01'),\n"
                   + "('id2','Stephen',33,'par1',TIMESTAMP '1970-01-01 
00:00:02'),\n"
                   + "('id3','Julian',53,'par2',TIMESTAMP '1970-01-01 
00:00:03'),\n"
                   + "('id4','Fabian',31,'par2',TIMESTAMP '1970-01-01 
00:00:04'),\n"
                   + "('id5','Sophia',18,'par3',TIMESTAMP '1970-01-01 
00:00:05'),\n"
                   + "('id6','Emma',20,'par3',TIMESTAMP '1970-01-01 
00:00:06'),\n"
                   + "('id7','Bob',44,'par4',TIMESTAMP '1970-01-01 
00:00:07'),\n"
                   + "('id8','Han',56,'par4',TIMESTAMP '1970-01-01 00:00:08')";
           execInsertSql(streamTableEnv, insertInto);
       }
   
       static void execInsertSql(TableEnvironment tEnv, String insert) {
           TableResult tableResult = tEnv.executeSql(insert);
           // wait to finish
           try {
               tableResult.getJobClient().get().getJobExecutionResult().get();
           } catch (InterruptedException | ExecutionException ex) {
               throw new RuntimeException(ex);
           }
       }
   ```
   
   Here is the pom I used to build the project with maven:
   ```
   <?xml version="1.0" encoding="UTF-8"?>
   <project xmlns="http://maven.apache.org/POM/4.0.0";
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <modelVersion>4.0.0</modelVersion>
   
       <groupId>com.amazonaws</groupId>
       <artifactId>aws-kinesis-analytics-java-apps</artifactId>
       <version>1.0</version>
       <packaging>jar</packaging>
   
       <properties>
           <java.version>1.11</java.version>
           <scala.binary.version>2.11</scala.binary.version>
           <flink.version>1.12.2</flink.version>
           <hudi.version>0.9.0</hudi.version>
           <kda.version>2.0.0</kda.version>
           <kda.runtime.version>1.2.0</kda.runtime.version>
       </properties>
   
       <dependencyManagement>
           <dependencies>
               <dependency>
                   <groupId>com.amazonaws</groupId>
                   <artifactId>aws-java-sdk-bom</artifactId>
                   <!-- Get the latest SDK version from 
https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
                   <version>1.11.903</version>
                   <type>pom</type>
                   <scope>import</scope>
               </dependency>
           </dependencies>
       </dependencyManagement>
   
       <dependencies>
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-kinesisanalytics-runtime</artifactId>
               <version>${kda.runtime.version}</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-kinesisanalytics-flink</artifactId>
               <version>${kda.version}</version>
           </dependency>
   
           <dependency>
               <groupId>com.amazonaws</groupId>
               <artifactId>aws-java-sdk-logs</artifactId>
               <!-- scope>compile</scope -->
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-table-common</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-csv</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               
<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.flink</groupId>
               
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.hudi</groupId>
               <artifactId>hudi-common</artifactId>
               <version>${hudi.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.hudi</groupId>
               <artifactId>hudi-client-common</artifactId>
               <version>${hudi.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.hudi</groupId>
               <artifactId>hudi-flink-client</artifactId>
               <version>${hudi.version}</version>
               <scope>provided</scope>
           </dependency>
   
           <dependency>
               <groupId>org.apache.hudi</groupId>
               <artifactId>hudi-sync-common</artifactId>
               <version>${hudi.version}</version>
               <scope>provided</scope>
           </dependency>
       </dependencies>
   
       <build>
   
           <plugins>
   
               <!-- Java Compiler -->
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.8.1</version>
                   <configuration>
                                                                    
<release>11</release>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <!-- We use the maven-shade plugin to create a fat jar that 
contains all necessary dependencies. -->
               <!-- Change the value of <mainClass>...</mainClass> if your 
program entry point changes. -->
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-shade-plugin</artifactId>
                   <version>3.2.1</version>
                   <executions>
                       <!-- Run shade goal on package phase -->
                       <execution>
                           <phase>package</phase>
                           <goals>
                               <goal>shade</goal>
                           </goals>
                           <configuration>
                               <artifactSet>
                                   <excludes>
                                       
<exclude>org.apache.flink:force-shading</exclude>
                                       
<exclude>com.google.code.findbugs:jsr305</exclude>
                                       <exclude>org.slf4j:*</exclude>
                                       <exclude>log4j:*</exclude>
                                   </excludes>
                               </artifactSet>
                               <filters>
                                   <filter>
                                       <!-- Do not copy the signatures in the 
META-INF folder.
                                       Otherwise, this might cause 
SecurityExceptions when using the JAR. -->
                                       <artifact>*:*</artifact>
                                       <excludes>
                                           <exclude>META-INF/*.SF</exclude>
                                           <exclude>META-INF/*.DSA</exclude>
                                           <exclude>META-INF/*.RSA</exclude>
                                       </excludes>
                                   </filter>
                               </filters>
                               <transformers combine.children="append">
                                   <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                       
<mainClass>com.amazonaws.services.kinesisanalytics.S3StreamingSinkJob</mainClass>
                                   </transformer>
                                   <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                               </transformers>
                           </configuration>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
   </project>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to