[BAHIR-99] kudu connector
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/c760e3cf Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/c760e3cf Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/c760e3cf Branch: refs/heads/master Commit: c760e3cfbd23c6c550800a07ef652e7f28e3f213 Parents: ca795cc Author: Joao Boto <b...@boto.pro> Authored: Wed Jul 25 20:17:36 2018 +0200 Committer: Luciano Resende <lrese...@apache.org> Committed: Thu Sep 13 19:32:13 2018 -0700 ---------------------------------------------------------------------- .travis.yml | 21 +- flink-connector-kudu/README.md | 98 +++++++++ flink-connector-kudu/dockers/docker-compose.yml | 92 ++++++++ flink-connector-kudu/dockers/role/Dockerfile | 41 ++++ .../dockers/role/docker-entrypoint.sh | 69 ++++++ flink-connector-kudu/dockers/run_kudu_tests.sh | 68 ++++++ flink-connector-kudu/dockers/start-images.sh | 42 ++++ flink-connector-kudu/dockers/stop-images.sh | 33 +++ flink-connector-kudu/pom.xml | 103 +++++++++ .../connectors/kudu/KuduInputFormat.java | 218 +++++++++++++++++++ .../connectors/kudu/KuduOutputFormat.java | 110 ++++++++++ .../streaming/connectors/kudu/KuduSink.java | 106 +++++++++ .../kudu/connector/KuduColumnInfo.java | 161 ++++++++++++++ .../kudu/connector/KuduConnector.java | 133 +++++++++++ .../kudu/connector/KuduFilterInfo.java | 173 +++++++++++++++ .../connectors/kudu/connector/KuduMapper.java | 146 +++++++++++++ .../connectors/kudu/connector/KuduRow.java | 137 ++++++++++++ .../kudu/connector/KuduTableInfo.java | 133 +++++++++++ .../connectors/kudu/KuduInputFormatTest.java | 91 ++++++++ .../connectors/kudu/KuduOuputFormatTest.java | 93 ++++++++ .../streaming/connectors/kudu/KuduSinkTest.java | 89 ++++++++ .../connectors/kudu/connector/KuduDatabase.java | 89 ++++++++ pom.xml | 3 +- 23 files changed, 2247 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 083e75d..691667c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,20 +28,39 @@ before_cache: language: java +services: + - docker + matrix: include: - jdk: oraclejdk8 env: - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" + - MAVEN_PROFILE="default" - CACHE_NAME=JDK8_F130_A - jdk: openjdk8 env: - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" - CACHE_NAME=JDK8_F130_C + - MAVEN_PROFILE="default" + - CACHE_NAME=JDK8_F130_B + - jdk: openjdk8 + env: + - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" + - MAVEN_PROFILE="test-kudu" + - CACHE_NAME=JDK8_F130_KUDU before_install: - ./dev/change-scala-version.sh $SCALA_VERSION install: true -script: mvn clean verify -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION +script: + - | + if [[ $MAVEN_PROFILE == "default" ]]; then + mvn clean verify -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION + fi + - | + if [[ $MAVEN_PROFILE == "test-kudu" ]]; then + flink-connector-kudu/dockers/run_kudu_tests.sh + fi http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/README.md ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md new file mode 100644 index 0000000..af2985b --- /dev/null +++ b/flink-connector-kudu/README.md @@ -0,0 +1,98 @@ +# Flink Kudu Connector + +This connector provides a source (```KuduInputFormat```) and a sink/output (```KuduSink``` and ```KuduOutputFormat```, respectively) that can read and write to [Kudu](https://kudu.apache.org/). To use this connector, add the +following dependency to your project: + + <dependency> + <groupId>org.apache.bahir</groupId> + <artifactId>flink-connector-kudu_2.11</artifactId> + <version>1.1-SNAPSHOT</version> + </dependency> + +*Version Compatibility*: This module is compatible with Apache Kudu *1.7.1* (last stable version). + +Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution. +See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-stable/start/dependencies.html). + +## Installing Kudu + +Follow the instructions from the [Kudu Installation Guide](https://kudu.apache.org/docs/installation.html). +Optionally, you can use the docker images provided in dockers folder. + +## KuduInputFormat + +``` +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +env.setParallelism(PARALLELISM); + +// create a table info object +KuduTableInfo tableInfo = KuduTableInfo.Builder + .create("books") + .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) + .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) + .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) + .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) + .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) + .build(); + +// Pass the tableInfo to the KuduInputFormat and provide kuduMaster ips +env.createInput(new KuduInputFormat<>("172.25.0.6", tableInfo)) + .count(); + +env.execute(); +``` + +## KuduOutputFormat + +``` +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +env.setParallelism(PARALLELISM); + +// create a table info object +KuduTableInfo tableInfo = KuduTableInfo.Builder + .create("books") + .createIfNotExist(true) + .replicas(1) + .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) + .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) + .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) + .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) + .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) + .build(); + +... + +env.fromCollection(books) + .output(new KuduOutputFormat<>("172.25.0.6", tableInfo)); + +env.execute(); +``` + +## KuduSink + +``` +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +env.setParallelism(PARALLELISM); + +// create a table info object +KuduTableInfo tableInfo = KuduTableInfo.Builder + .create("books") + .createIfNotExist(true) + .replicas(1) + .addColumn(KuduColumnInfo.Builder.create("id", Type.INT32).key(true).hashKey(true).build()) + .addColumn(KuduColumnInfo.Builder.create("title", Type.STRING).build()) + .addColumn(KuduColumnInfo.Builder.create("author", Type.STRING).build()) + .addColumn(KuduColumnInfo.Builder.create("price", Type.DOUBLE).build()) + .addColumn(KuduColumnInfo.Builder.create("quantity", Type.INT32).build()) + .build(); + +... + +env.fromCollection(books) + .addSink(new KuduSink<>("172.25.0.6", tableInfo)); + +env.execute(); +``` http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/docker-compose.yml ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/dockers/docker-compose.yml b/flink-connector-kudu/dockers/docker-compose.yml new file mode 100644 index 0000000..d2c95bb --- /dev/null +++ b/flink-connector-kudu/dockers/docker-compose.yml @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +version: '2' + +services: + + kudu-master: + image: eskabetxe/kudu + container_name: kudu-master + hostname: 172.25.0.6 + ports: + - "8051:8051" + volumes: + - /var/lib/kudu/master + command: master + networks: + mynet: + ipv4_address: 172.25.0.6 + + kudu-server1: + image: eskabetxe/kudu + container_name: kudu-server1 + hostname: 172.25.0.7 + environment: + - KUDU_MASTER=172.25.0.6 + ports: + - "8054:8050" + volumes: + - /var/lib/kudu/server + command: tserver + networks: + mynet: + ipv4_address: 172.25.0.7 + links: + - kudu-master + + kudu-server2: + image: eskabetxe/kudu + container_name: kudu-server2 + hostname: 172.25.0.8 + environment: + - KUDU_MASTER=172.25.0.6 + ports: + - "8052:8050" + volumes: + - /var/lib/kudu/server + command: tserver + networks: + mynet: + ipv4_address: 172.25.0.8 + links: + - kudu-master + + kudu-server3: + image: eskabetxe/kudu + container_name: kudu-server3 + hostname: 172.25.0.9 + environment: + - KUDU_MASTER=172.25.0.6 + ports: + - "8053:8050" + volumes: + - /var/lib/kudu/server + command: tserver + networks: + mynet: + ipv4_address: 172.25.0.9 + links: + - kudu-master + +networks: + mynet: + driver: bridge + ipam: + config: + - subnet: 172.25.0.0/24 + IPRange: 172.25.0.2/24, + gateway: 172.25.0.1 http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/role/Dockerfile ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/dockers/role/Dockerfile b/flink-connector-kudu/dockers/role/Dockerfile new file mode 100644 index 0000000..b14b087 --- /dev/null +++ b/flink-connector-kudu/dockers/role/Dockerfile @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM bitnami/minideb:jessie +MAINTAINER eskabetxe + +RUN set -x \ + && apt-get update \ + && apt-get install -y --no-install-recommends \ + bzip2 unzip xz-utils wget \ + && cd /etc/apt/sources.list.d \ + && wget -qO - http://archive.cloudera.com/kudu/debian/jessie/amd64/kudu/archive.key | apt-key add - \ + && wget http://archive.cloudera.com/kudu/debian/jessie/amd64/kudu/cloudera.list \ + && apt-get update \ + && apt-get install --no-install-recommends -y \ + kudu kudu-master kudu-tserver libkuduclient0 libkuduclient-dev \ + && rm -rf /var/lib/apt/lists/* \ + && apt-get autoclean + +VOLUME /var/lib/kudu/master /var/lib/kudu/tserver + +COPY docker-entrypoint.sh / +RUN chmod a+x /docker-entrypoint.sh + +ENTRYPOINT ["/docker-entrypoint.sh"] +EXPOSE 8050 8051 7050 7051 +#CMD ["help"] http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/role/docker-entrypoint.sh ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/dockers/role/docker-entrypoint.sh b/flink-connector-kudu/dockers/role/docker-entrypoint.sh new file mode 100644 index 0000000..770850c --- /dev/null +++ b/flink-connector-kudu/dockers/role/docker-entrypoint.sh @@ -0,0 +1,69 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +set -e + +function do_help { + echo HELP: + echo "Supported commands:" + echo " master - Start a Kudu Master" + echo " tserver - Start a Kudu TServer" + echo " single - Start a Kudu Master+TServer in one container" + echo " kudu - Run the Kudu CLI" + echo " help - print useful information and exit"l + echo "" + echo "Other commands can be specified to run shell commands." + echo "Set the environment variable KUDU_OPTS to pass additional" + echo "arguments to the kudu process. DEFAULT_KUDU_OPTS contains" + echo "a recommended base set of options." + + exit 0 +} + +DEFAULT_KUDU_OPTS="-logtostderr \ + -fs_wal_dir=/var/lib/kudu/$1 \ + -fs_data_dirs=/var/lib/kudu/$1 \ + -use_hybrid_clock=false" + +KUDU_OPTS=${KUDU_OPTS:-${DEFAULT_KUDU_OPTS}} + +if [ "$1" = 'master' ]; then + exec kudu-master -fs_wal_dir /var/lib/kudu/master ${KUDU_OPTS} +elif [ "$1" = 'tserver' ]; then + exec kudu-tserver -fs_wal_dir /var/lib/kudu/tserver \ + -tserver_master_addrs ${KUDU_MASTER} ${KUDU_OPTS} +elif [ "$1" = 'single' ]; then + KUDU_MASTER=boot2docker + KUDU_MASTER_OPTS="-logtostderr \ + -fs_wal_dir=/var/lib/kudu/master \ + -fs_data_dirs=/var/lib/kudu/master \ + -use_hybrid_clock=false" + KUDU_TSERVER_OPTS="-logtostderr \ + -fs_wal_dir=/var/lib/kudu/tserver \ + -fs_data_dirs=/var/lib/kudu/tserver \ + -use_hybrid_clock=false" + exec kudu-master -fs_wal_dir /var/lib/kudu/master ${KUDU_MASTER_OPTS} & + sleep 5 + exec kudu-tserver -fs_wal_dir /var/lib/kudu/tserver -tserver_master_addrs ${KUDU_MASTER} ${KUDU_TSERVER_OPTS} +elif [ "$1" = 'kudu' ]; then + shift; # Remove first arg and pass remainder to kudu cli + exec kudu "$@" +elif [ "$1" = 'help' ]; then + do_help +fi + +exec "$@" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/run_kudu_tests.sh ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/dockers/run_kudu_tests.sh b/flink-connector-kudu/dockers/run_kudu_tests.sh new file mode 100755 index 0000000..58593d6 --- /dev/null +++ b/flink-connector-kudu/dockers/run_kudu_tests.sh @@ -0,0 +1,68 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# Runs all tests with Kudu server in docker containers. + +set -euo pipefail -x + +# http://stackoverflow.com/questions/3572030/bash-script-absolute-path-with-osx +function absolutepath() { + [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}" +} + +SCRIPT_DIR=$(dirname $(absolutepath "$0")) + +PROJECT_ROOT="${SCRIPT_DIR}/../.." + +DOCKER_COMPOSE_LOCATION="${SCRIPT_DIR}/docker-compose.yml" + + +function build_image() { + docker build -t eskabetxe/kudu ${SCRIPT_DIR}/role + + #docker-compose build -f "${DOCKER_COMPOSE_LOCATION}" +} + +function start_docker_container() { + # stop already running containers + #docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down || true + + # start containers + docker-compose -f "${DOCKER_COMPOSE_LOCATION}" up -d +} + +function cleanup_docker_container() { + docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down + #true +} + +build_image + +start_docker_container + +#run product tests +pushd ${PROJECT_ROOT} +set +e +mvn test -pl flink-connector-kudu -P test-kudu +EXIT_CODE=$? +set -e +popd + +cleanup_docker_container + +exit ${EXIT_CODE} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/start-images.sh ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/dockers/start-images.sh b/flink-connector-kudu/dockers/start-images.sh new file mode 100755 index 0000000..fad3de6 --- /dev/null +++ b/flink-connector-kudu/dockers/start-images.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +function absolutepath() { + [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}" +} + +function build_image() { + docker build -t eskabetxe/kudu ${SCRIPT_DIR}/role + + #docker-compose build -f "${DOCKER_COMPOSE_LOCATION}" +} + +function start_docker_container() { + # stop already running containers + docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down || true + + # start containers + docker-compose -f "${DOCKER_COMPOSE_LOCATION}" up -d +} + +SCRIPT_DIR=$(dirname $(absolutepath "$0")) +DOCKER_COMPOSE_LOCATION="${SCRIPT_DIR}/docker-compose.yml" + +build_image + +start_docker_container http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/dockers/stop-images.sh ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/dockers/stop-images.sh b/flink-connector-kudu/dockers/stop-images.sh new file mode 100755 index 0000000..9ae52c1 --- /dev/null +++ b/flink-connector-kudu/dockers/stop-images.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +function absolutepath() { + [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}" +} + +function cleanup_docker_container() { + docker-compose -f "${DOCKER_COMPOSE_LOCATION}" down + + docker rm eskabetxe/kudu +} + +SCRIPT_DIR=$(dirname $(absolutepath "$0")) +DOCKER_COMPOSE_LOCATION="${SCRIPT_DIR}/docker-compose.yml" + +cleanup_docker_container + http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml new file mode 100644 index 0000000..348371b --- /dev/null +++ b/flink-connector-kudu/pom.xml @@ -0,0 +1,103 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-flink-parent_2.11</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-kudu_2.11</artifactId> + <name>flink-connector-kudu</name> + <packaging>jar</packaging> + + <properties> + <kudu.version>1.7.1</kudu.version> + <junit.version>5.2.0</junit.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.kudu</groupId> + <artifactId>kudu-client</artifactId> + <version>${kudu.version}</version> + </dependency> + + <!--test dependencies--> + + <dependency> + <groupId>org.apache.kudu</groupId> + <artifactId>kudu-client</artifactId> + <version>${kudu.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <profiles> + <profile> + <id>default</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/*Test.java</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>test-kudu</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java new file mode 100644 index 0000000..617e317 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormat.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu; + +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.core.io.LocatableInputSplit; +import org.apache.flink.streaming.connectors.kudu.connector.*; +import org.apache.flink.util.Preconditions; +import org.apache.kudu.client.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class KuduInputFormat extends RichInputFormat<KuduRow, KuduInputFormat.KuduInputSplit> { + + private String kuduMasters; + private KuduTableInfo tableInfo; + private List<KuduFilterInfo> tableFilters; + private List<String> tableProjections; + private Long rowsLimit; + private boolean endReached; + + private transient KuduConnector tableContext; + private transient KuduScanner scanner; + private transient RowResultIterator resultIterator; + + private static final Logger LOG = LoggerFactory.getLogger(KuduInputFormat.class); + + public KuduInputFormat(String kuduMasters, KuduTableInfo tableInfo) { + Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null"); + this.kuduMasters = kuduMasters; + + Preconditions.checkNotNull(tableInfo,"tableInfo could not be null"); + this.tableInfo = tableInfo; + + this.endReached = false; + } + + public KuduInputFormat withTableFilters(KuduFilterInfo... tableFilters) { + return withTableFilters(Arrays.asList(tableFilters)); + } + + public KuduInputFormat withTableFilters(List<KuduFilterInfo> tableFilters) { + this.tableFilters = tableFilters; + return this; + } + + public KuduInputFormat withTableProjections(String... tableProjections) { + return withTableProjections(Arrays.asList(tableProjections)); + } + public KuduInputFormat withTableProjections(List<String> tableProjections) { + this.tableProjections = tableProjections; + return this; + } + + public KuduInputFormat withRowsLimit(Long rowsLimit) { + this.rowsLimit = rowsLimit; + return this; + } + + @Override + public void configure(Configuration parameters) { + + } + + @Override + public void open(KuduInputSplit split) throws IOException { + endReached = false; + startTableContext(); + + scanner = tableContext.scanner(split.getScanToken()); + resultIterator = scanner.nextRows(); + } + + @Override + public void close() { + if (scanner != null) { + try { + scanner.close(); + } catch (KuduException e) { + e.printStackTrace(); + } + } + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return cachedStatistics; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(KuduInputSplit[] inputSplits) { + return new LocatableInputSplitAssigner(inputSplits); + } + + private void startTableContext() throws IOException { + if (tableContext == null) { + tableContext = new KuduConnector(kuduMasters, tableInfo); + } + } + + @Override + public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException { + startTableContext(); + Preconditions.checkNotNull(tableContext,"tableContext should not be null"); + + List<KuduScanToken> tokens = tableContext.scanTokens(tableFilters, tableProjections, rowsLimit); + + KuduInputSplit[] splits = new KuduInputSplit[tokens.size()]; + + for (int i = 0; i < tokens.size(); i++) { + KuduScanToken token = tokens.get(i); + + List<String> locations = new ArrayList<>(token.getTablet().getReplicas().size()); + + for (LocatedTablet.Replica replica : token.getTablet().getReplicas()) { + locations.add(getLocation(replica.getRpcHost(), replica.getRpcPort())); + } + + KuduInputSplit split = new KuduInputSplit( + token.serialize(), + i, + locations.toArray(new String[locations.size()]) + ); + splits[i] = split; + } + + if (splits.length < minNumSplits) { + LOG.warn(" The minimum desired number of splits with your configured parallelism level " + + "is {}. Current kudu splits = {}. {} instances will remain idle.", + minNumSplits, + splits.length, + (minNumSplits - splits.length) + ); + } + + return splits; + } + + @Override + public boolean reachedEnd() throws IOException { + return endReached; + } + + @Override + public KuduRow nextRecord(KuduRow reuse) throws IOException { + // check that current iterator has next rows + if (this.resultIterator.hasNext()) { + RowResult row = this.resultIterator.next(); + return KuduMapper.toKuduRow(row); + } + // if not, check that current scanner has more iterators + else if (scanner.hasMoreRows()) { + this.resultIterator = scanner.nextRows(); + return nextRecord(reuse); + } + else { + endReached = true; + } + return null; + } + + /** + * Returns a endpoint url in the following format: <host>:<ip> + * + * @param host Hostname + * @param port Port + * @return Formatted URL + */ + private String getLocation(String host, Integer port) { + StringBuilder builder = new StringBuilder(); + builder.append(host).append(":").append(port); + return builder.toString(); + } + + + public class KuduInputSplit extends LocatableInputSplit { + + private byte[] scanToken; + + /** + * Creates a new KuduInputSplit + * @param splitNumber the number of the input split + * @param hostnames The names of the hosts storing the data this input split refers to. + */ + public KuduInputSplit(byte[] scanToken, final int splitNumber, final String[] hostnames) { + super(splitNumber, hostnames); + + this.scanToken = scanToken; + } + + public byte[] getScanToken() { + return scanToken; + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java new file mode 100644 index 0000000..5c23f36 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduOutputFormat.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu; + +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector; +import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; +import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class KuduOutputFormat<OUT extends KuduRow> implements OutputFormat<OUT> { + + private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class); + + private String kuduMasters; + private KuduTableInfo tableInfo; + private KuduConnector.Consistency consistency; + private KuduConnector.WriteMode writeMode; + + private transient KuduConnector tableContext; + + + public KuduOutputFormat(String kuduMasters, KuduTableInfo tableInfo) { + Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null"); + this.kuduMasters = kuduMasters; + + Preconditions.checkNotNull(tableInfo,"tableInfo could not be null"); + this.tableInfo = tableInfo; + this.consistency = KuduConnector.Consistency.STRONG; + this.writeMode = KuduConnector.WriteMode.UPSERT; + } + + public KuduOutputFormat<OUT> withEventualConsistency() { + this.consistency = KuduConnector.Consistency.EVENTUAL; + return this; + } + + public KuduOutputFormat<OUT> withStrongConsistency() { + this.consistency = KuduConnector.Consistency.STRONG; + return this; + } + + public KuduOutputFormat<OUT> withUpsertWriteMode() { + this.writeMode = KuduConnector.WriteMode.UPSERT; + return this; + } + + public KuduOutputFormat<OUT> withInsertWriteMode() { + this.writeMode = KuduConnector.WriteMode.INSERT; + return this; + } + + public KuduOutputFormat<OUT> withUpdateWriteMode() { + this.writeMode = KuduConnector.WriteMode.UPDATE; + return this; + } + + @Override + public void configure(Configuration parameters) { + + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + startTableContext(); + } + + private void startTableContext() throws IOException { + if (tableContext != null) return; + tableContext = new KuduConnector(kuduMasters, tableInfo); + } + + @Override + public void writeRecord(OUT kuduRow) throws IOException { + try { + tableContext.writeRow(kuduRow, consistency, writeMode); + } catch (Exception e) { + throw new IOException(e.getLocalizedMessage(), e); + } + } + + @Override + public void close() throws IOException { + if (this.tableContext == null) return; + try { + this.tableContext.close(); + } catch (Exception e) { + throw new IOException(e.getLocalizedMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java new file mode 100644 index 0000000..120d5c5 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.kudu.connector.KuduConnector; +import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; +import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class KuduSink<OUT extends KuduRow> extends RichSinkFunction<OUT> { + + private static final Logger LOG = LoggerFactory.getLogger(KuduOutputFormat.class); + + private String kuduMasters; + private KuduTableInfo tableInfo; + private KuduConnector.Consistency consistency; + private KuduConnector.WriteMode writeMode; + + private transient KuduConnector tableContext; + + + public KuduSink(String kuduMasters, KuduTableInfo tableInfo) { + Preconditions.checkNotNull(kuduMasters,"kuduMasters could not be null"); + this.kuduMasters = kuduMasters; + + Preconditions.checkNotNull(tableInfo,"tableInfo could not be null"); + this.tableInfo = tableInfo; + this.consistency = KuduConnector.Consistency.STRONG; + this.writeMode = KuduConnector.WriteMode.UPSERT; + } + + public KuduSink<OUT> withEventualConsistency() { + this.consistency = KuduConnector.Consistency.EVENTUAL; + return this; + } + + public KuduSink<OUT> withStrongConsistency() { + this.consistency = KuduConnector.Consistency.STRONG; + return this; + } + + public KuduSink<OUT> withUpsertWriteMode() { + this.writeMode = KuduConnector.WriteMode.UPSERT; + return this; + } + + public KuduSink<OUT> withInsertWriteMode() { + this.writeMode = KuduConnector.WriteMode.INSERT; + return this; + } + + public KuduSink<OUT> withUpdateWriteMode() { + this.writeMode = KuduConnector.WriteMode.UPDATE; + return this; + } + + @Override + public void open(Configuration parameters) throws IOException { + startTableContext(); + } + + private void startTableContext() throws IOException { + if (tableContext != null) return; + tableContext = new KuduConnector(kuduMasters, tableInfo); + } + + + @Override + public void invoke(OUT kuduRow) throws Exception { + try { + tableContext.writeRow(kuduRow, consistency, writeMode); + } catch (Exception e) { + throw new IOException(e.getLocalizedMessage(), e); + } + } + + @Override + public void close() throws Exception { + if (this.tableContext == null) return; + try { + this.tableContext.close(); + } catch (Exception e) { + throw new IOException(e.getLocalizedMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java new file mode 100644 index 0000000..4dfc0b8 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduColumnInfo.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu.connector; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Type; + +import java.io.Serializable; + +public class KuduColumnInfo implements Serializable { + + private String name; + private Type type; + private boolean key; + private boolean rangeKey; + private boolean hashKey; + private boolean nullable; + private Object defaultValue; + private int blockSize; + private Encoding encoding; + private Compression compression; + + private KuduColumnInfo(String name, Type type) { + this.name = name; + this.type = type; + this.blockSize = 0; + this.key = false; + this.rangeKey = false; + this.hashKey = false; + this.nullable = false; + this.defaultValue = null; + this.encoding = Encoding.AUTO; + this.compression = Compression.DEFAULT; + } + + protected String name() { + return name; + } + + protected boolean isRangeKey() { + return rangeKey; + } + + protected boolean isHashKey() { + return hashKey; + } + + protected ColumnSchema columnSchema() { + return new ColumnSchema.ColumnSchemaBuilder(name, type) + .key(key) + .nullable(nullable) + .defaultValue(defaultValue) + .desiredBlockSize(blockSize) + .encoding(encoding.encode) + .compressionAlgorithm(compression.algorithm) + .build(); + } + + public static class Builder { + private KuduColumnInfo column; + + private Builder(String name, Type type) { + this.column = new KuduColumnInfo(name, type); + } + + public static Builder create(String name, Type type) { + return new Builder(name, type); + } + + public Builder key(boolean key) { + this.column.key = key; + return this; + } + + public Builder rangeKey(boolean rangeKey) { + this.column.rangeKey = rangeKey; + return this; + } + + public Builder hashKey(boolean hashKey) { + this.column.hashKey = hashKey; + return this; + } + + public Builder nullable(boolean nullable) { + this.column.nullable = nullable; + return this; + } + + public Builder defaultValue(Object defaultValue) { + this.column.defaultValue = defaultValue; + return this; + } + + public Builder desiredBlockSize(int blockSize) { + this.column.blockSize = blockSize; + return this; + } + + public Builder encoding(Encoding encoding) { + this.column.encoding = encoding; + return this; + } + + public Builder compressionAlgorithm(Compression compression) { + this.column.compression = compression; + return this; + } + + public KuduColumnInfo build() { + return column; + } + } + + public enum Compression { + UNKNOWN(ColumnSchema.CompressionAlgorithm.UNKNOWN), + DEFAULT(ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION), + WITHOUT(ColumnSchema.CompressionAlgorithm.NO_COMPRESSION), + SNAPPY(ColumnSchema.CompressionAlgorithm.SNAPPY), + LZ4(ColumnSchema.CompressionAlgorithm.LZ4), + ZLIB(ColumnSchema.CompressionAlgorithm.ZLIB); + + final ColumnSchema.CompressionAlgorithm algorithm; + + Compression(ColumnSchema.CompressionAlgorithm algorithm) { + this.algorithm = algorithm; + } + } + + public enum Encoding { + UNKNOWN(ColumnSchema.Encoding.UNKNOWN), + AUTO(ColumnSchema.Encoding.AUTO_ENCODING), + PLAIN(ColumnSchema.Encoding.PLAIN_ENCODING), + PREFIX(ColumnSchema.Encoding.PREFIX_ENCODING), + GROUP_VARINT(ColumnSchema.Encoding.GROUP_VARINT), + RLE(ColumnSchema.Encoding.RLE), + DICT(ColumnSchema.Encoding.DICT_ENCODING), + BIT_SHUFFLE(ColumnSchema.Encoding.BIT_SHUFFLE); + + final ColumnSchema.Encoding encode; + + Encoding(ColumnSchema.Encoding encode) { + this.encode = encode; + } + } + +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java new file mode 100644 index 0000000..0e2e6bc --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduConnector.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu.connector; + +import com.stumbleupon.async.Callback; +import org.apache.commons.collections.CollectionUtils; +import org.apache.kudu.client.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +public class KuduConnector implements AutoCloseable { + + private final Logger LOG = LoggerFactory.getLogger(this.getClass()); + + public enum Consistency {EVENTUAL, STRONG}; + public enum WriteMode {INSERT,UPDATE,UPSERT} + + private AsyncKuduClient client; + private KuduTable table; + + public KuduConnector(String kuduMasters, KuduTableInfo tableInfo) throws IOException { + client = client(kuduMasters); + table = table(tableInfo); + } + + private AsyncKuduClient client(String kuduMasters) { + return new AsyncKuduClient.AsyncKuduClientBuilder(kuduMasters).build(); + } + + private KuduTable table(KuduTableInfo infoTable) throws IOException { + KuduClient syncClient = client.syncClient(); + + String tableName = infoTable.getName(); + if (syncClient.tableExists(tableName)) { + return syncClient.openTable(tableName); + } + if (infoTable.createIfNotExist()) { + return syncClient.createTable(tableName, infoTable.getSchema(), infoTable.getCreateTableOptions()); + } + throw new UnsupportedOperationException("table not exists and is marketed to not be created"); + } + + public boolean deleteTable() throws IOException { + String tableName = table.getName(); + client.syncClient().deleteTable(tableName); + return true; + } + + public KuduScanner scanner(byte[] token) throws IOException { + return KuduScanToken.deserializeIntoScanner(token, client.syncClient()); + } + + public List<KuduScanToken> scanTokens(List<KuduFilterInfo> tableFilters, List<String> tableProjections, Long rowLimit) { + KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.syncClient().newScanTokenBuilder(table); + + if (CollectionUtils.isNotEmpty(tableProjections)) { + tokenBuilder.setProjectedColumnNames(tableProjections); + } + + if (CollectionUtils.isNotEmpty(tableFilters)) { + tableFilters.stream() + .map(filter -> filter.toPredicate(table.getSchema())) + .forEach(tokenBuilder::addPredicate); + } + + if (rowLimit !=null && rowLimit > 0) { + tokenBuilder.limit(rowLimit); + // FIXME: https://issues.apache.org/jira/browse/KUDU-16 + // Server side limit() operator for java-based scanners are not implemented yet + } + + return tokenBuilder.build(); + } + + public boolean writeRow(KuduRow row, Consistency consistency, WriteMode writeMode) throws Exception { + final Operation operation = KuduMapper.toOperation(table, writeMode, row); + + if (Consistency.EVENTUAL.equals(consistency)) { + AsyncKuduSession session = client.newSession(); + session.apply(operation); + session.flush(); + return session.close().addCallback(new ResponseCallback()).join(); + } else { + KuduSession session = client.syncClient().newSession(); + session.apply(operation); + session.flush(); + return processResponse(session.close()); + } + } + + @Override + public void close() throws Exception { + if (client == null) return; + + client.close(); + } + + private Boolean processResponse(List<OperationResponse> operationResponses) { + Boolean isOk = operationResponses.isEmpty(); + for(OperationResponse operationResponse : operationResponses) { + logResponseError(operationResponse.getRowError()); + } + return isOk; + } + + private void logResponseError(RowError error) { + LOG.error("Error {} on {}: {} ", error.getErrorStatus(), error.getOperation(), error.toString()); + } + + private class ResponseCallback implements Callback<Boolean, List<OperationResponse>> { + @Override + public Boolean call(List<OperationResponse> operationResponses) { + return processResponse(operationResponses); + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java new file mode 100644 index 0000000..bd20fc8 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduFilterInfo.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu.connector; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.client.KuduPredicate; + +import java.util.List; + + +public class KuduFilterInfo { + + private String column; + private FilterType type; + private Object value; + + private KuduFilterInfo() { } + + public KuduPredicate toPredicate(Schema schema) { + return toPredicate(schema.getColumn(this.column)); + } + public KuduPredicate toPredicate(ColumnSchema column) { + KuduPredicate predicate; + switch (this.type) { + case IS_IN: + predicate = KuduPredicate.newInListPredicate(column, (List) this.value); + break; + case IS_NULL: + predicate = KuduPredicate.newIsNullPredicate(column); + break; + case IS_NOT_NULL: + predicate = KuduPredicate.newIsNotNullPredicate(column); + break; + default: + predicate = predicateComparator(column); + break; + } + return predicate; + } + + private KuduPredicate predicateComparator(ColumnSchema column) { + + KuduPredicate.ComparisonOp comparison = this.type.comparator; + + KuduPredicate predicate; + + switch (column.getType()) { + case STRING: + predicate = KuduPredicate.newComparisonPredicate(column, comparison, (String)this.value); + break; + case FLOAT: + predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Float)this.value); + break; + case INT8: + predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Byte)this.value); + break; + case INT16: + predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Short)this.value); + break; + case INT32: + predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Integer)this.value); + break; + case INT64: + predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Long)this.value); + break; + case DOUBLE: + predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Double)this.value); + break; + case BOOL: + predicate = KuduPredicate.newComparisonPredicate(column, comparison, (Boolean)this.value); + break; + case UNIXTIME_MICROS: + Long time = (Long)this.value; + predicate = KuduPredicate.newComparisonPredicate(column, comparison, time*1000); + break; + case BINARY: + predicate = KuduPredicate.newComparisonPredicate(column, comparison, (byte[])this.value); + break; + default: + throw new IllegalArgumentException("Illegal var type: " + column.getType()); + } + return predicate; + } + + public static class Builder { + private KuduFilterInfo filter; + + private Builder(String column) { + this.filter = new KuduFilterInfo(); + this.filter.column = column; + } + + public static Builder create(String column) { + return new Builder(column); + } + + public Builder greaterThan(Object value) { + return filter(FilterType.GREATER, value); + } + + public Builder lessThan(Object value) { + return filter(FilterType.LESS, value); + } + + public Builder equalTo(Object value) { + return filter(FilterType.EQUAL, value); + } + + public Builder greaterOrEqualTo(Object value) { + return filter(FilterType.GREATER_EQUAL, value); + } + + public Builder lessOrEqualTo(Object value) { + return filter(FilterType.LESS_EQUAL, value); + } + + public Builder isNotNull() { + return filter(FilterType.IS_NOT_NULL, null); + } + + public Builder isNull() { + return filter(FilterType.IS_NULL, null); + } + + public Builder isIn(List values) { + return filter(FilterType.IS_IN, values); + } + + public Builder filter(FilterType type, Object value) { + this.filter.type = type; + this.filter.value = value; + return this; + } + + public KuduFilterInfo build() { + return filter; + } + } + + public enum FilterType { + GREATER(KuduPredicate.ComparisonOp.GREATER), + GREATER_EQUAL(KuduPredicate.ComparisonOp.GREATER_EQUAL), + EQUAL(KuduPredicate.ComparisonOp.EQUAL), + LESS(KuduPredicate.ComparisonOp.LESS), + LESS_EQUAL(KuduPredicate.ComparisonOp.LESS_EQUAL), + IS_NOT_NULL(null), + IS_NULL(null), + IS_IN(null); + + final KuduPredicate.ComparisonOp comparator; + + FilterType(KuduPredicate.ComparisonOp comparator) { + this.comparator = comparator; + } + + } + +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java new file mode 100644 index 0000000..b1366ba --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduMapper.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu.connector; + + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.RowResult; + +import java.util.List; + +public final class KuduMapper { + + private KuduMapper() { } + + public static KuduRow toKuduRow(RowResult row) { + Schema schema = row.getColumnProjection(); + List<ColumnSchema> columns = schema.getColumns(); + + KuduRow values = new KuduRow(columns.size()); + for (int i = 0; i < columns.size(); i++) { + String name = schema.getColumnByIndex(i).getName(); + if(row.isNull(i)) { + values.setField(i, name, null); + } else { + Type type = schema.getColumnByIndex(i).getType(); + switch (type) { + case BINARY: + values.setField(i, name, row.getBinary(i)); + break; + case STRING: + values.setField(i, name, row.getString(i)); + break; + case BOOL: + values.setField(i, name, row.getBoolean(i)); + break; + case DOUBLE: + values.setField(i, name, row.getDouble(i)); + break; + case FLOAT: + values.setField(i, name, row.getFloat(i)); + break; + case INT8: + values.setField(i, name, row.getByte(i)); + break; + case INT16: + values.setField(i, name, row.getShort(i)); + break; + case INT32: + values.setField(i, name, row.getInt(i)); + break; + case INT64: + case UNIXTIME_MICROS: + values.setField(i, name, row.getLong(i)); + break; + default: + throw new IllegalArgumentException("Illegal var type: " + type); + } + } + } + return values; + } + + + public static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode, KuduRow row) { + final Operation operation = toOperation(table, writeMode); + final PartialRow partialRow = operation.getRow(); + + Schema schema = table.getSchema(); + List<ColumnSchema> columns = schema.getColumns(); + + for (int i = 0; i < columns.size(); i++) { + String columnName = schema.getColumnByIndex(i).getName(); + Object value = row.getField(i); + if (value == null) { + partialRow.setNull(columnName); + } else { + Type type = schema.getColumnByIndex(i).getType(); + switch (type) { + case STRING: + partialRow.addString(columnName, (String) value); + break; + case FLOAT: + partialRow.addFloat(columnName, (Float) value); + break; + case INT8: + partialRow.addByte(columnName, (Byte) value); + break; + case INT16: + partialRow.addShort(columnName, (Short) value); + break; + case INT32: + partialRow.addInt(columnName, (Integer) value); + break; + case INT64: + partialRow.addLong(columnName, (Long) value); + break; + case DOUBLE: + partialRow.addDouble(columnName, (Double) value); + break; + case BOOL: + partialRow.addBoolean(columnName, (Boolean) value); + break; + case UNIXTIME_MICROS: + //*1000 to correctly create date on kudu + partialRow.addLong(columnName, ((Long) value) * 1000); + break; + case BINARY: + partialRow.addBinary(columnName, (byte[]) value); + break; + default: + throw new IllegalArgumentException("Illegal var type: " + type); + } + } + } + return operation; + } + + public static Operation toOperation(KuduTable table, KuduConnector.WriteMode writeMode) { + switch (writeMode) { + case INSERT: return table.newInsert(); + case UPDATE: return table.newUpdate(); + case UPSERT: return table.newUpsert(); + } + return table.newUpsert(); + } + +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java new file mode 100644 index 0000000..03f5e5c --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduRow.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu.connector; + +import org.apache.flink.types.Row; +import org.apache.kudu.Schema; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.*; +import java.util.stream.Stream; + +public class KuduRow extends Row { + + private Map<String, Integer> rowNames; + + public KuduRow(Integer arity) { + super(arity); + rowNames = new LinkedHashMap<>(); + } + + public KuduRow(Object object, Schema schema) { + super(validFields(object)); + for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) { + basicValidation(c.getDeclaredFields()) + .filter(field -> schema.getColumn(field.getName()) != null) + .forEach(cField -> { + try { + cField.setAccessible(true); + setField(schema.getColumnIndex(cField.getName()), cField.getName(), cField.get(object)); + } catch (IllegalAccessException e) { + String error = String.format("Cannot get value for %s", cField.getName()); + throw new IllegalArgumentException(error, e); + } + }); + } + } + + + public Object getField(String name) { + return super.getField(rowNames.get(name)); + } + + public void setField(int pos, String name, Object value) { + super.setField(pos, value); + this.rowNames.put(name, pos); + } + + public boolean isNull(int pos) { + return getField(pos) == null; + } + + private static int validFields(Object object) { + Long validField = 0L; + for (Class<?> c = object.getClass(); c != null; c = c.getSuperclass()) { + validField += basicValidation(c.getDeclaredFields()).count(); + } + return validField.intValue(); + } + + private static Stream<Field> basicValidation(Field[] fields) { + return Arrays.stream(fields) + .filter(cField -> !Modifier.isStatic(cField.getModifiers())) + .filter(cField -> !Modifier.isTransient(cField.getModifiers())); + } + + public Map<String,Object> blindMap() { + Map<String,Object> toRet = new LinkedHashMap<>(); + rowNames.entrySet().stream() + .sorted(Comparator.comparing(Map.Entry::getValue)) + .forEach(entry -> toRet.put(entry.getKey(), super.getField(entry.getValue()))); + return toRet; + } + + public <P> P blind(Class<P> clazz) { + P o = createInstance(clazz); + + for (Class<?> c = clazz; c != null; c = c.getSuperclass()) { + Field[] fields = c.getDeclaredFields(); + for (Field cField : fields) { + try { + if(rowNames.containsKey(cField.getName()) + && !Modifier.isStatic(cField.getModifiers()) + && !Modifier.isTransient(cField.getModifiers())) { + + cField.setAccessible(true); + Object value = getField(cField.getName()); + if (value != null) { + if (cField.getType() == value.getClass()) { + cField.set(o, value); + } else if (cField.getType() == Long.class && value.getClass() == Date.class) { + cField.set(o, ((Date) value).getTime()); + } else { + cField.set(o, value); + } + } + } + } catch (IllegalAccessException e) { + String error = String.format("Cannot get value for %s", cField.getName()); + throw new IllegalArgumentException(error, e); + } + } + } + + return o; + + } + + + private <P> P createInstance(Class<P> clazz) { + try { + return clazz.getConstructor().newInstance(); + } catch (ReflectiveOperationException e) { + String error = String.format("Cannot create instance for %s", clazz.getSimpleName()); + throw new IllegalArgumentException(error, e); + } + } + + @Override + public String toString() { + return blindMap().toString(); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java new file mode 100644 index 0000000..dfea382 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/connector/KuduTableInfo.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu.connector; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.client.CreateTableOptions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class KuduTableInfo implements Serializable { + + private static final Integer DEFAULT_REPLICAS = 1; + private static final boolean DEFAULT_CREATE_IF_NOT_EXIST = false; + + private Integer replicas; + private String name; + private boolean createIfNotExist; + private List<KuduColumnInfo> columns; + + private KuduTableInfo(String name){ + this.name = name; + this.replicas = DEFAULT_REPLICAS; + this.createIfNotExist = DEFAULT_CREATE_IF_NOT_EXIST; + this.columns = new ArrayList<>(); + } + + public String getName() { + return name; + } + + public Schema getSchema() { + if(hasNotColumns()) return null; + List<ColumnSchema> schemaColumns = new ArrayList<>(); + for(KuduColumnInfo column : columns){ + schemaColumns.add(column.columnSchema()); + } + return new Schema(schemaColumns); + } + + public boolean createIfNotExist() { + return createIfNotExist; + } + + public CreateTableOptions getCreateTableOptions() { + CreateTableOptions options = new CreateTableOptions(); + if(replicas!=null){ + options.setNumReplicas(replicas); + } + if(hasColummns()) { + List<String> rangeKeys = new ArrayList<>(); + List<String> hashKeys = new ArrayList<>(); + for(KuduColumnInfo column : columns){ + if(column.isRangeKey()){ + rangeKeys.add(column.name()); + } + if(column.isHashKey()){ + hashKeys.add(column.name()); + } + } + options.setRangePartitionColumns(rangeKeys); + options.addHashPartitions(hashKeys, replicas*2); + } + + return options; + } + + public boolean hasNotColumns(){ + return !hasColummns(); + } + public boolean hasColummns(){ + return (columns!=null && columns.size()>0); + } + + public static class Builder { + KuduTableInfo table; + + private Builder(String name) { + table = new KuduTableInfo(name); + } + + public static Builder create(String name) { + return new Builder(name); + } + + public static Builder open(String name) { + return new Builder(name); + } + + public Builder createIfNotExist(boolean createIfNotExist) { + this.table.createIfNotExist = createIfNotExist; + return this; + } + + public Builder replicas(int replicas) { + if (replicas == 0) return this; + this.table.replicas = replicas; + return this; + } + + public Builder columns(List<KuduColumnInfo> columns) { + if(columns==null) return this; + this.table.columns.addAll(columns); + return this; + } + + public Builder addColumn(KuduColumnInfo column) { + if(column==null) return this; + this.table.columns.add(column); + return this; + } + + public KuduTableInfo build() { + return table; + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java new file mode 100644 index 0000000..8cfc102 --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduInputFormatTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu; + +import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase; +import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; +import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class KuduInputFormatTest extends KuduDatabase { + + + + + @Test + public void testInvalidKuduMaster() throws IOException { + KuduTableInfo tableInfo = booksTableInfo("books",false); + Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(null, tableInfo)); + } + + @Test + public void testInvalidTableInfo() throws IOException { + Assertions.assertThrows(NullPointerException.class, () -> new KuduInputFormat(hostsCluster, null)); + } + + @Test + public void testInputFormat() throws Exception { + KuduTableInfo tableInfo = booksTableInfo("books",true); + setUpDatabase(tableInfo); + + List<KuduRow> rows = readRows(tableInfo); + Assertions.assertEquals(5, rows.size()); + + cleanDatabase(tableInfo); + } + + @Test + public void testInputFormatWithProjection() throws Exception { + KuduTableInfo tableInfo = booksTableInfo("books",true); + setUpDatabase(tableInfo); + + List<KuduRow> rows = readRows(tableInfo,"title","id"); + Assertions.assertEquals(5, rows.size()); + + for (KuduRow row: rows) { + Assertions.assertEquals(2, row.getArity()); + } + + cleanDatabase(tableInfo); + } + + + public static List<KuduRow> readRows(KuduTableInfo tableInfo, String... fieldProjection) throws Exception { + KuduInputFormat inputFormat = new KuduInputFormat(hostsCluster, tableInfo) + .withTableProjections(fieldProjection); + + KuduInputFormat.KuduInputSplit[] splits = inputFormat.createInputSplits(1); + List<KuduRow> rows = new ArrayList<>(); + for (KuduInputFormat.KuduInputSplit split : splits) { + inputFormat.open(split); + while(!inputFormat.reachedEnd()) { + KuduRow row = inputFormat.nextRecord(new KuduRow(5)); + if(row != null) { + rows.add(row); + } + } + } + inputFormat.close(); + + return rows; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java new file mode 100644 index 0000000..6eb5ebe --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduOuputFormatTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu; + +import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase; +import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; +import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +public class KuduOuputFormatTest extends KuduDatabase { + + + + @Test + public void testInvalidKuduMaster() throws IOException { + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); + Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo)); + } + + @Test + public void testInvalidTableInfo() throws IOException { + Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null)); + } + + @Test + public void testNotTableExist() throws IOException { + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); + KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo); + Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0,1)); + } + + @Test + public void testOutputWithStrongConsistency() throws Exception { + + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); + KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo) + .withStrongConsistency(); + outputFormat.open(0,1); + + for (KuduRow kuduRow : booksDataRow()) { + outputFormat.writeRecord(kuduRow); + } + outputFormat.close(); + + List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo); + Assertions.assertEquals(5, rows.size()); + + cleanDatabase(tableInfo); + } + + @Test + public void testOutputWithEventualConsistency() throws Exception { + + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); + KuduOutputFormat outputFormat = new KuduOutputFormat<>(hostsCluster, tableInfo) + .withEventualConsistency(); + outputFormat.open(0,1); + + for (KuduRow kuduRow : booksDataRow()) { + outputFormat.writeRecord(kuduRow); + } + + // sleep to allow eventual consistency to finish + Thread.sleep(1000); + + outputFormat.close(); + + List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo); + Assertions.assertEquals(5, rows.size()); + + cleanDatabase(tableInfo); + } + +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java new file mode 100644 index 0000000..9e9ae93 --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/KuduSinkTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kudu; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.kudu.connector.KuduDatabase; +import org.apache.flink.streaming.connectors.kudu.connector.KuduRow; +import org.apache.flink.streaming.connectors.kudu.connector.KuduTableInfo; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +public class KuduSinkTest extends KuduDatabase { + + + @Test + public void testInvalidKuduMaster() throws IOException { + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); + Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(null, tableInfo)); + } + + @Test + public void testInvalidTableInfo() throws IOException { + Assertions.assertThrows(NullPointerException.class, () -> new KuduOutputFormat<>(hostsCluster, null)); + } + + @Test + public void testNotTableExist() throws IOException { + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),false); + KuduSink sink = new KuduSink<>(hostsCluster, tableInfo); + Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration())); + } + + @Test + public void testOutputWithStrongConsistency() throws Exception { + + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); + KuduSink sink = new KuduSink<>(hostsCluster, tableInfo) + .withStrongConsistency(); + sink.open(new Configuration()); + + for (KuduRow kuduRow : booksDataRow()) { + sink.invoke(kuduRow); + } + sink.close(); + + List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo); + Assertions.assertEquals(5, rows.size()); + + } + + @Test + public void testOutputWithEventualConsistency() throws Exception { + KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(),true); + KuduSink sink = new KuduSink<>(hostsCluster, tableInfo) + .withEventualConsistency(); + sink.open(new Configuration()); + + for (KuduRow kuduRow : booksDataRow()) { + sink.invoke(kuduRow); + } + + // sleep to allow eventual consistency to finish + Thread.sleep(1000); + + sink.close(); + + List<KuduRow> rows = KuduInputFormatTest.readRows(tableInfo); + Assertions.assertEquals(5, rows.size()); + } + +}