[ https://issues.apache.org/jira/browse/BAHIR-99?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097673#comment-16097673 ]
ASF GitHub Bot commented on BAHIR-99: ------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/bahir-flink/pull/17#discussion_r128921995 --- Diff: flink-connector-kudu/src/main/java/es/accenture/flink/Utils/Utils.java --- @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package es.accenture.flink.Utils; + +import es.accenture.flink.Sink.KuduOutputFormat; +import es.accenture.flink.Utils.Exceptions.KuduClientException; +import es.accenture.flink.Utils.Exceptions.KuduTableException; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.*; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class Utils { + + //Kudu variables + private KuduClient client; + private KuduSession session; + + // LOG4J + private final static Logger logger = Logger.getLogger(Utils.class); + + /** + * Builder Util Class which creates a Kudu client and log in to be able to perform operations later + * @param host Kudu's host + * @throws KuduClientException In case of exception caused by Kudu Client + */ + public Utils(String host) throws KuduClientException { + this.client = new KuduClient.KuduClientBuilder(host).build(); + if (client == null){ + throw new KuduClientException("ERROR: param \"host\" not valid, can't establish connection"); + } + this.session = this.client.newSession(); + } + + /** + * Return an instance of the table indicated in the settings + * + * In case that the table exists, return an instance of the table + * In case that the table doesn't exist, create a new table with the data provided and return an instance + * In both cases,takes into account the way of the table to perfom some operations or others + * + * If the mode is CREATE: + * + * If the table exists: return error (Can not create table that already exists) + * If the table doesn't exist and the list of column names has not been provided: return error + * If the table doesn't exist and the list of column names has been provided: create a new table with data provided and return an instance + * + * If the mode is APPEND: + * + * If the table exists: return the instance in the table + * If the table doesn't exist: return error + * + * If the mode is OVERRIDE: + * + * If the table exist: delete all rows of this table and return an instance of it + * If the table doesn't exist: return error + * + * + * @param tableName Table name to use + * @param tableMode Operations mode for operate with the table (CREATE, APPEND, OVERRIDE) + * @return Instance of the table indicated + * @throws KuduTableException In case of can't access to a table o can't create it (wrong params or not existing table) + * @throws KuduException In case of error of Kudu + */ + public KuduTable useTable(String tableName, Integer tableMode) throws KuduTableException, KuduException { + KuduTable table; + + if (tableMode == KuduOutputFormat.CREATE) { + logger.error("Bad call method, use useTable(String tableName, String [] fieldsNames, RowSerializable row) instead"); + table = null; + }else if (tableMode == KuduOutputFormat.APPEND) { + logger.info("Modo APPEND"); + try { + if (client.tableExists(tableName)) { + //logger.info("SUCCESS: There is the table with the name \"" + tableName + "\""); + table = client.openTable(tableName); + } else { + logger.error("ERROR: The table doesn't exist"); + throw new KuduTableException("ERROR: The table doesn't exist, so can't do APPEND operation"); + } + } catch (Exception e) { + throw new KuduTableException("ERROR: param \"host\" not valid, can't establish connection"); + } + }else if (tableMode == KuduOutputFormat.OVERRIDE) { + logger.info("Modo OVERRIDE"); + try { + if (client.tableExists(tableName)) { + logger.info("SUCCESS: There is the table with the name \"" + tableName + "\". Emptying the table"); + clearTable(tableName); + table = client.openTable(tableName); + } else { + logger.error("ERROR: The table doesn't exist"); + throw new KuduTableException("ERROR: The table doesn't exist, so can't do OVERRIDE operation"); + } + } catch (Exception e) { + throw new KuduTableException("ERROR: param \"host\" not valid, can't establish connection"); + } + }else { + throw new KuduTableException("ERROR: Incorrect parameters, please check the constructor method. Incorrect \"tableMode\" parameter."); + } + return table; + } + + /** + * Returns an instance of the table requested in parameters + * If the table exists, returns an instance of the table + * If the table doesn't exist, creates a new table with the data provided and returns an instance + * + * @param tableName Table name to use + * @param fieldsNames List of names of columns of the table (to create table) + * @param row List of values to insert a row in the table (to know the types of columns) + * @return Instance of the table indicated + * @throws IllegalArgumentException In case of wrong parameters + * @throws KuduException In case of exception caused by Kudu + */ + public KuduTable useTable(String tableName, String [] fieldsNames, RowSerializable row) throws IllegalArgumentException, KuduException { + KuduTable table; + + if (client.tableExists(tableName)){ + logger.info("The table exists"); + table = client.openTable(tableName); + } else { + if (tableName == null || tableName.equals("")) { + throw new IllegalArgumentException("ERROR: Incorrect parameters, please check the constructor method. Incorrect \"tableName\" parameter."); + + } else if (fieldsNames == null || fieldsNames[0].isEmpty()) { + throw new IllegalArgumentException("ERROR: Incorrect parameters, please check the constructor method. Missing \"fields\" parameter."); + + } else if (row == null){ + throw new IllegalArgumentException("ERROR: Incorrect parameters, please check the constructor method. Incorrect \"row\" parameter."); + + } else { + logger.info("The table doesn't exist"); + table = createTable(tableName, fieldsNames, row); + } + } + return table; + } + /** + * Create a new Kudu table and return the instance of this table + * + * @param tableName name of the table to create + * @param fieldsNames list name columns of the table + * @param row list of values to insert a row in the table( to know the types of columns) + * @return instance of the table indicated + * @throws KuduException In case of exception caused by Kudu + */ + public KuduTable createTable (String tableName, String [] fieldsNames, RowSerializable row) throws KuduException { + + if(client.tableExists(tableName)) + return client.openTable(tableName); + + + List<ColumnSchema> columns = new ArrayList<ColumnSchema>(); + List<String> rangeKeys = new ArrayList<String>(); // Primary key + rangeKeys.add(fieldsNames[0]); + + logger.info("Creating the table \"" + tableName + "\"..."); + for (int i = 0; i < fieldsNames.length; i++){ + ColumnSchema col; + String colName = fieldsNames[i]; + Type colType = getRowsPositionType(i, row); + + if (colName.equals(fieldsNames[0])) { + col = new ColumnSchemaBuilder(colName, colType).key(true).build(); + columns.add(0, col);//To create the table, the key must be the first in the column list otherwise it will give a failure + } else { + col = new ColumnSchemaBuilder(colName, colType).build(); + columns.add(col); + } + } + Schema schema = new Schema(columns); + + if(!client.tableExists(tableName)) + client.createTable(tableName, schema, new CreateTableOptions().setRangePartitionColumns(rangeKeys).addHashPartitions(rangeKeys, 4)); + //logger.info("SUCCESS: The table has been created successfully"); + + + return client.openTable(tableName); + } + /** + * Delete the indicated table + * + * @param tableName name table to delete + */ + public void deleteTable (String tableName){ + + logger.info("Deleting the table \"" + tableName + "\"..."); + try { + if(client.tableExists(tableName)) { + client.deleteTable(tableName); + logger.info("SUCCESS: Table deleted successfully"); + } + } catch (KuduException e) { + logger.error("The table \"" + tableName +"\" doesn't exist, so can't be deleted.", e); + } + } + + /** + * Return the type of the value of the position "pos", like the class object "Type" + * + * @param pos Row position + * @param row list of values to insert a row in the table + * @return element type "pos"-esimo of "row" + */ + public Type getRowsPositionType (int pos, RowSerializable row){ + Type colType = null; + switch(row.productElement(pos).getClass().getName()){ + case "java.lang.String": + colType = Type.STRING; + break; + case "java.lang.Integer": + colType = Type.INT32; + break; + case "java.lang.Boolean": + colType = Type.BOOL; + break; + default: + break; + } + return colType; + } + + /** + * Return a list with all rows of the indicated table + * + * @param tableName Table name to read + * @return List of rows in the table(object Row) + * @throws KuduException In case of exception caused by Kudu + */ + public List<RowSerializable> readTable (String tableName) throws KuduException { + + KuduTable table = client.openTable(tableName); + KuduScanner scanner = client.newScannerBuilder(table).build(); + //Obtain the column name list + String[] columnsNames = getNamesOfColumns(table); + //The list return all rows + List<RowSerializable> rowsList = new ArrayList<>(); + + int posRow = 0; + while (scanner.hasMoreRows()) { + for (RowResult row : scanner.nextRows()) { //Get the rows + RowSerializable rowToInsert = new RowSerializable(columnsNames.length); + for (String col : columnsNames) { //For each column, it's type determined and this is how to read it + + String colType = row.getColumnType(col).getName(); + switch (colType) { + case "string": + rowToInsert.setField(posRow, row.getString(col)); + posRow++; + break; + case "int32": + rowToInsert.setField(posRow, row.getInt(col)); + posRow++; + break; + case "bool": + rowToInsert.setField(posRow, row.getBoolean(col)); + posRow++; + break; + default: + break; + } + } + rowsList.add(rowToInsert); + posRow = 0; + } + } + return rowsList; + } + + + + /** + * Return a list with all rows of the indicated table + * + * @param tableName Table name to read + * @throws KuduException In case of exception caused by Kudu + */ + public void readTablePrint (String tableName) throws KuduException { + KuduTable table = client.openTable(tableName); + KuduScanner scanner = client.newScannerBuilder(table).build(); + int cont = 0; + try { + while (scanner.hasMoreRows()) { + RowResultIterator results = scanner.nextRows(); + while (results.hasNext()) { + RowResult result = results.next(); + System.out.println(result.rowToString()); + cont++; + } + } + System.out.println("Number of rows: " + cont); + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + client.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + /** + * Returns a representation on the table screen of a table + * + * @param row row to show + * @return a string containing the data of the row indicated in the parameter + */ + public String printRow (RowSerializable row){ + String res = ""; + for(int i = 0; i< row.productArity(); i++){ + res += (row.productElement(i) + " | "); + } + return res; + } + + /** + * Deelte all rows of the table until empty + * + * @param tableName table name to empty + * @throws KuduException In case of exception caused by Kudu + */ + public void clearTable (String tableName) throws KuduException { + KuduTable table = client.openTable(tableName); + List<RowSerializable> rowsList = readTable(tableName); + + String primaryKey = table.getSchema().getPrimaryKeyColumns().get(0).getName(); + List<Delete> deletes = new ArrayList<>(); + for(RowSerializable row : rowsList){ + Delete d = table.newDelete(); + switch(getRowsPositionType(0, row).getName()){ + case "string": + d.getRow().addString(primaryKey, (String) row.productElement(0)); + break; + + case "int32": + d.getRow().addInt(primaryKey, (Integer) row.productElement(0)); + break; + + case "bool": + d.getRow().addBoolean(primaryKey, (Boolean) row.productElement(0)); + break; + + default: + break; + } + deletes.add(d); + } + for(Delete d : deletes){ + session.apply(d); + } + logger.info("SUCCESS: The table has been emptied successfully"); + } + + /** + * Return a list of columns names in a table + * + * @param table table instance + * @return List of column names in the table indicated in the parameter + */ + public String [] getNamesOfColumns (KuduTable table){ + List<ColumnSchema> columns = table.getSchema().getColumns(); + List<String> columnsNames = new ArrayList<>(); // List of column names + for (ColumnSchema schema : columns) { + columnsNames.add(schema.getName()); + } + String [] array = new String[columnsNames.size()]; + array = columnsNames.toArray(array); + return array; + } + + public boolean checkNamesOfColumns(String [] tableNames, String [] providedNames) throws KuduTableException{ + boolean res = false; + if(tableNames.length != providedNames.length){ + res = false; + } else{ + for (int i = 0; i < tableNames.length; i++) { + res = tableNames[i].equals(providedNames[i]) ? true : false; + } + } + if(!res){ + throw new KuduTableException("ERROR: The table column names and the provided column names don't match"); + } + return res; + } + + public void insert (KuduTable table, RowSerializable row, String [] fieldsNames) throws KuduException, NullPointerException { + + Insert insert; + try{ + insert = table.newInsert(); + } catch (NullPointerException e){ + throw new NullPointerException("Error encountered at opening/creating table"); --- End diff -- I don't think this re-throw adds any value. If this is a protection against table being null, I would do an explicit null check. > Kudu connector to read/write from/to Kudu > ----------------------------------------- > > Key: BAHIR-99 > URL: https://issues.apache.org/jira/browse/BAHIR-99 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors > Affects Versions: Flink-1.0 > Reporter: Rubén Casado > Assignee: Rubén Casado > Fix For: Flink-Next > > > Java library to integrate Apache Kudu and Apache Flink. Main goal is to be > able to read/write data from/to Kudu using the DataSet and DataStream Flink's > APIs. > Data flows patterns: > Batch > - Kudu -> DataSet<RowSerializable> -> Kudu > - Kudu -> DataSet<RowSerializable> -> other source > - Other source -> DataSet<RowSerializable> -> other source > Stream > - Other source -> DataStream <RowSerializable> -> Kudu > Code is available in https://github.com/rubencasado/Flink-Kudu -- This message was sent by Atlassian JIRA (v6.4.14#64029)