[ https://issues.apache.org/jira/browse/BAHIR-99?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097663#comment-16097663 ]
ASF GitHub Bot commented on BAHIR-99: ------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/bahir-flink/pull/17#discussion_r128921944 --- Diff: flink-connector-kudu/src/main/java/es/accenture/flink/Utils/Utils.java --- @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package es.accenture.flink.Utils; + +import es.accenture.flink.Sink.KuduOutputFormat; +import es.accenture.flink.Utils.Exceptions.KuduClientException; +import es.accenture.flink.Utils.Exceptions.KuduTableException; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.*; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class Utils { + + //Kudu variables + private KuduClient client; + private KuduSession session; + + // LOG4J + private final static Logger logger = Logger.getLogger(Utils.class); + + /** + * Builder Util Class which creates a Kudu client and log in to be able to perform operations later + * @param host Kudu's host + * @throws KuduClientException In case of exception caused by Kudu Client + */ + public Utils(String host) throws KuduClientException { + this.client = new KuduClient.KuduClientBuilder(host).build(); + if (client == null){ + throw new KuduClientException("ERROR: param \"host\" not valid, can't establish connection"); + } + this.session = this.client.newSession(); + } + + /** + * Return an instance of the table indicated in the settings + * + * In case that the table exists, return an instance of the table + * In case that the table doesn't exist, create a new table with the data provided and return an instance + * In both cases,takes into account the way of the table to perfom some operations or others + * + * If the mode is CREATE: + * + * If the table exists: return error (Can not create table that already exists) + * If the table doesn't exist and the list of column names has not been provided: return error + * If the table doesn't exist and the list of column names has been provided: create a new table with data provided and return an instance + * + * If the mode is APPEND: + * + * If the table exists: return the instance in the table + * If the table doesn't exist: return error + * + * If the mode is OVERRIDE: + * + * If the table exist: delete all rows of this table and return an instance of it + * If the table doesn't exist: return error + * + * + * @param tableName Table name to use + * @param tableMode Operations mode for operate with the table (CREATE, APPEND, OVERRIDE) + * @return Instance of the table indicated + * @throws KuduTableException In case of can't access to a table o can't create it (wrong params or not existing table) + * @throws KuduException In case of error of Kudu + */ + public KuduTable useTable(String tableName, Integer tableMode) throws KuduTableException, KuduException { + KuduTable table; + + if (tableMode == KuduOutputFormat.CREATE) { + logger.error("Bad call method, use useTable(String tableName, String [] fieldsNames, RowSerializable row) instead"); + table = null; + }else if (tableMode == KuduOutputFormat.APPEND) { + logger.info("Modo APPEND"); + try { + if (client.tableExists(tableName)) { + //logger.info("SUCCESS: There is the table with the name \"" + tableName + "\""); + table = client.openTable(tableName); + } else { + logger.error("ERROR: The table doesn't exist"); + throw new KuduTableException("ERROR: The table doesn't exist, so can't do APPEND operation"); + } + } catch (Exception e) { + throw new KuduTableException("ERROR: param \"host\" not valid, can't establish connection"); + } + }else if (tableMode == KuduOutputFormat.OVERRIDE) { + logger.info("Modo OVERRIDE"); + try { + if (client.tableExists(tableName)) { + logger.info("SUCCESS: There is the table with the name \"" + tableName + "\". Emptying the table"); + clearTable(tableName); + table = client.openTable(tableName); + } else { + logger.error("ERROR: The table doesn't exist"); + throw new KuduTableException("ERROR: The table doesn't exist, so can't do OVERRIDE operation"); + } + } catch (Exception e) { + throw new KuduTableException("ERROR: param \"host\" not valid, can't establish connection"); + } + }else { + throw new KuduTableException("ERROR: Incorrect parameters, please check the constructor method. Incorrect \"tableMode\" parameter."); + } + return table; + } + + /** + * Returns an instance of the table requested in parameters + * If the table exists, returns an instance of the table + * If the table doesn't exist, creates a new table with the data provided and returns an instance + * + * @param tableName Table name to use + * @param fieldsNames List of names of columns of the table (to create table) + * @param row List of values to insert a row in the table (to know the types of columns) + * @return Instance of the table indicated + * @throws IllegalArgumentException In case of wrong parameters + * @throws KuduException In case of exception caused by Kudu + */ + public KuduTable useTable(String tableName, String [] fieldsNames, RowSerializable row) throws IllegalArgumentException, KuduException { + KuduTable table; + + if (client.tableExists(tableName)){ + logger.info("The table exists"); + table = client.openTable(tableName); + } else { + if (tableName == null || tableName.equals("")) { + throw new IllegalArgumentException("ERROR: Incorrect parameters, please check the constructor method. Incorrect \"tableName\" parameter."); + + } else if (fieldsNames == null || fieldsNames[0].isEmpty()) { + throw new IllegalArgumentException("ERROR: Incorrect parameters, please check the constructor method. Missing \"fields\" parameter."); + + } else if (row == null){ + throw new IllegalArgumentException("ERROR: Incorrect parameters, please check the constructor method. Incorrect \"row\" parameter."); + + } else { + logger.info("The table doesn't exist"); + table = createTable(tableName, fieldsNames, row); + } + } + return table; + } + /** + * Create a new Kudu table and return the instance of this table + * + * @param tableName name of the table to create + * @param fieldsNames list name columns of the table + * @param row list of values to insert a row in the table( to know the types of columns) + * @return instance of the table indicated + * @throws KuduException In case of exception caused by Kudu + */ + public KuduTable createTable (String tableName, String [] fieldsNames, RowSerializable row) throws KuduException { + + if(client.tableExists(tableName)) + return client.openTable(tableName); + + + List<ColumnSchema> columns = new ArrayList<ColumnSchema>(); + List<String> rangeKeys = new ArrayList<String>(); // Primary key + rangeKeys.add(fieldsNames[0]); + + logger.info("Creating the table \"" + tableName + "\"..."); + for (int i = 0; i < fieldsNames.length; i++){ + ColumnSchema col; + String colName = fieldsNames[i]; + Type colType = getRowsPositionType(i, row); + + if (colName.equals(fieldsNames[0])) { + col = new ColumnSchemaBuilder(colName, colType).key(true).build(); + columns.add(0, col);//To create the table, the key must be the first in the column list otherwise it will give a failure + } else { + col = new ColumnSchemaBuilder(colName, colType).build(); + columns.add(col); + } + } + Schema schema = new Schema(columns); + + if(!client.tableExists(tableName)) + client.createTable(tableName, schema, new CreateTableOptions().setRangePartitionColumns(rangeKeys).addHashPartitions(rangeKeys, 4)); + //logger.info("SUCCESS: The table has been created successfully"); + + + return client.openTable(tableName); + } + /** + * Delete the indicated table + * + * @param tableName name table to delete + */ + public void deleteTable (String tableName){ + + logger.info("Deleting the table \"" + tableName + "\"..."); + try { + if(client.tableExists(tableName)) { + client.deleteTable(tableName); + logger.info("SUCCESS: Table deleted successfully"); + } + } catch (KuduException e) { + logger.error("The table \"" + tableName +"\" doesn't exist, so can't be deleted.", e); + } + } + + /** + * Return the type of the value of the position "pos", like the class object "Type" + * + * @param pos Row position + * @param row list of values to insert a row in the table + * @return element type "pos"-esimo of "row" + */ + public Type getRowsPositionType (int pos, RowSerializable row){ + Type colType = null; + switch(row.productElement(pos).getClass().getName()){ + case "java.lang.String": + colType = Type.STRING; + break; + case "java.lang.Integer": + colType = Type.INT32; + break; + case "java.lang.Boolean": + colType = Type.BOOL; + break; + default: + break; + } + return colType; + } + + /** + * Return a list with all rows of the indicated table + * + * @param tableName Table name to read + * @return List of rows in the table(object Row) + * @throws KuduException In case of exception caused by Kudu + */ + public List<RowSerializable> readTable (String tableName) throws KuduException { + + KuduTable table = client.openTable(tableName); + KuduScanner scanner = client.newScannerBuilder(table).build(); + //Obtain the column name list + String[] columnsNames = getNamesOfColumns(table); + //The list return all rows + List<RowSerializable> rowsList = new ArrayList<>(); + + int posRow = 0; + while (scanner.hasMoreRows()) { + for (RowResult row : scanner.nextRows()) { //Get the rows + RowSerializable rowToInsert = new RowSerializable(columnsNames.length); + for (String col : columnsNames) { //For each column, it's type determined and this is how to read it + + String colType = row.getColumnType(col).getName(); + switch (colType) { + case "string": + rowToInsert.setField(posRow, row.getString(col)); + posRow++; + break; + case "int32": + rowToInsert.setField(posRow, row.getInt(col)); + posRow++; + break; + case "bool": + rowToInsert.setField(posRow, row.getBoolean(col)); + posRow++; + break; + default: + break; + } + } + rowsList.add(rowToInsert); + posRow = 0; + } + } + return rowsList; + } + + + + /** + * Return a list with all rows of the indicated table + * + * @param tableName Table name to read + * @throws KuduException In case of exception caused by Kudu + */ + public void readTablePrint (String tableName) throws KuduException { + KuduTable table = client.openTable(tableName); + KuduScanner scanner = client.newScannerBuilder(table).build(); + int cont = 0; + try { + while (scanner.hasMoreRows()) { + RowResultIterator results = scanner.nextRows(); + while (results.hasNext()) { + RowResult result = results.next(); + System.out.println(result.rowToString()); + cont++; + } + } + System.out.println("Number of rows: " + cont); + } catch (Exception e) { --- End diff -- The exception handling needs improvement > Kudu connector to read/write from/to Kudu > ----------------------------------------- > > Key: BAHIR-99 > URL: https://issues.apache.org/jira/browse/BAHIR-99 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors > Affects Versions: Flink-1.0 > Reporter: Rubén Casado > Assignee: Rubén Casado > Fix For: Flink-Next > > > Java library to integrate Apache Kudu and Apache Flink. Main goal is to be > able to read/write data from/to Kudu using the DataSet and DataStream Flink's > APIs. > Data flows patterns: > Batch > - Kudu -> DataSet<RowSerializable> -> Kudu > - Kudu -> DataSet<RowSerializable> -> other source > - Other source -> DataSet<RowSerializable> -> other source > Stream > - Other source -> DataStream <RowSerializable> -> Kudu > Code is available in https://github.com/rubencasado/Flink-Kudu -- This message was sent by Atlassian JIRA (v6.4.14#64029)