[ 
https://issues.apache.org/jira/browse/FLINK-3872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324346#comment-15324346
 ] 

ASF GitHub Bot commented on FLINK-3872:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66603012
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
    @@ -735,6 +738,123 @@ public void flatMap(Tuple3<Integer, Integer, String> 
value, Collector<Integer> o
                }
        }
     
    +   /**
    +    * Runs a table source test with JSON data.
    +    *
    +    * The table source needs to parse the following JSON fields:
    +    * - "long" -> number
    +    * - "string" -> "string"
    +    * - "boolean" -> true|false
    +    * - "double" -> fraction
    +    */
    +   public void runJsonTableSource(String topic, KafkaTableSource 
kafkaTableSource) throws Exception {
    +           final ObjectMapper mapper = new ObjectMapper();
    +
    +           final int numElements = 1024;
    +           final long[] longs = new long[numElements];
    +           final String[] strings = new String[numElements];
    +           final boolean[] booleans = new boolean[numElements];
    +           final double[] doubles = new double[numElements];
    +
    +           final byte[][] serializedJson = new byte[numElements][];
    +
    +           ThreadLocalRandom random = ThreadLocalRandom.current();
    +
    +           for (int i = 0; i < numElements; i++) {
    +                   longs[i] = random.nextLong();
    +                   strings[i] = Integer.toHexString(random.nextInt());
    +                   booleans[i] = random.nextBoolean();
    +                   doubles[i] = random.nextDouble();
    +
    +                   ObjectNode entry = mapper.createObjectNode();
    +                   entry.put("long", longs[i]);
    +                   entry.put("string", strings[i]);
    +                   entry.put("boolean", booleans[i]);
    +                   entry.put("double", doubles[i]);
    +
    +                   serializedJson[i] = mapper.writeValueAsBytes(entry);
    +           }
    +
    +           // Produce serialized JSON data
    +           createTestTopic(topic, 1, 1);
    +
    +           StreamExecutionEnvironment env = StreamExecutionEnvironment
    +                           .createRemoteEnvironment("localhost", 
flinkPort);
    +           env.getConfig().disableSysoutLogging();
    +
    +           env.addSource(new SourceFunction<byte[]>() {
    +                   @Override
    +                   public void run(SourceContext<byte[]> ctx) throws 
Exception {
    +                           for (int i = 0; i < numElements; i++) {
    +                                   ctx.collect(serializedJson[i]);
    +                           }
    +                   }
    +
    +                   @Override
    +                   public void cancel() {
    +                   }
    +           }).addSink(kafkaServer.getProducer(
    +                           topic,
    +                           new ByteArraySerializationSchema(),
    +                           standardProps,
    +                           null));
    +
    +           // Execute blocks
    +           env.execute();
    +
    +           // Register as table source
    +           StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.getTableEnvironment(env);
    +           tableEnvironment.registerTableSource("kafka", kafkaTableSource);
    +
    +           Table result = tableEnvironment.ingest("kafka");
    +
    +           tableEnvironment.toDataStream(result, Row.class).addSink(new 
SinkFunction<Row>() {
    +
    +                   int i = 0;
    +
    +                   @Override
    +                   public void invoke(Row value) throws Exception {
    +                           if (i > numElements) {
    +                                   throw new 
IllegalStateException("Received too many rows.");
    +                           }
    +
    +                           assertEquals(longs[i], value.productElement(0));
    +                           assertEquals(strings[i], 
value.productElement(1));
    +                           assertEquals(booleans[i], 
value.productElement(2));
    +                           assertEquals(doubles[i], 
value.productElement(3));
    +
    +                           if (i == numElements-1) {
    +                                   throw new SuccessException();
    --- End diff --
    
    Doesn't this prevent to check whether the source emits too many records, 
i.e., the check in line 817 would never evaluate `true` right?


> Add Kafka TableSource with JSON serialization
> ---------------------------------------------
>
>                 Key: FLINK-3872
>                 URL: https://issues.apache.org/jira/browse/FLINK-3872
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API
>            Reporter: Fabian Hueske
>            Assignee: Ufuk Celebi
>             Fix For: 1.1.0
>
>
> Add a Kafka TableSource which reads JSON serialized data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to