This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new c5e3c999ad PHOENIX-7223 Make Sure Tools Always Close HBase Connections
on Exit
c5e3c999ad is described below
commit c5e3c999ad1bed9b5bdafd3f02533e5b0e6aa307
Author: Istvan Toth <[email protected]>
AuthorDate: Tue Feb 20 13:01:19 2024 +0100
PHOENIX-7223 Make Sure Tools Always Close HBase Connections on Exit
* don't throw exceptions from Tools, log the error and return non-zero exit
code
* Close all Phoenix Connections in Tools
* Close cached CQSI objects on PhoenixDriver.close()
---
.../jdbc/ClusterRoleRecordGeneratorTool.java | 19 ++-
.../org/apache/phoenix/jdbc/PhoenixDriver.java | 13 +-
.../apache/phoenix/jdbc/PhoenixHAAdminTool.java | 63 ++++----
.../org/apache/phoenix/schema/tool/SchemaTool.java | 29 ++--
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 159 ++++++++++++---------
.../apache/phoenix/mapreduce/OrphanViewTool.java | 2 +
.../phoenix/mapreduce/index/IndexUpgradeTool.java | 11 +-
.../phoenix/schema/stats/UpdateStatisticsTool.java | 19 ++-
.../util/MergeViewIndexIdSequencesTool.java | 17 +--
.../apache/phoenix/end2end/CsvBulkLoadToolIT.java | 41 +++---
.../phoenix/end2end/RegexBulkLoadToolIT.java | 20 +--
11 files changed, 221 insertions(+), 172 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java
index 49ec3db61a..93899f87a2 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java
@@ -66,13 +66,18 @@ public class ClusterRoleRecordGeneratorTool extends
Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
- String fileName = getConf().get(PHOENIX_HA_GENERATOR_FILE_ATTR);
- File file = StringUtils.isEmpty(fileName)
- ? File.createTempFile("phoenix.ha.cluster.role.records",
".json")
- : new File(fileName);
- JacksonUtil.getObjectWriterPretty().writeValue(file,
listAllRecordsByZk());
- System.out.println("Created JSON file '" + file + "'");
- return 0;
+ try {
+ String fileName = getConf().get(PHOENIX_HA_GENERATOR_FILE_ATTR);
+ File file = StringUtils.isEmpty(fileName)
+ ? File.createTempFile("phoenix.ha.cluster.role.records",
".json")
+ : new File(fileName);
+ JacksonUtil.getObjectWriterPretty().writeValue(file,
listAllRecordsByZk());
+ System.out.println("Created JSON file '" + file + "'");
+ return 0;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return -1;
+ }
}
List<ClusterRoleRecord> listAllRecordsByZk() throws IOException {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index ca412d5238..8bdc6ea182 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -144,6 +144,7 @@ public final class PhoenixDriver extends
PhoenixEmbeddedDriver {
}
// One entry per cluster here
+ // TODO that's not true, we can have multiple connections with different
configs / principals
private final Cache<ConnectionInfo, ConnectionQueryServices>
connectionQueryServicesCache =
initializeConnectionCache();
@@ -341,8 +342,18 @@ public final class PhoenixDriver extends
PhoenixEmbeddedDriver {
services = null;
}
}
+
+ if (connectionQueryServicesCache != null) {
+ try {
+ for (ConnectionQueryServices cqsi :
connectionQueryServicesCache.asMap().values()) {
+ cqsi.close();
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close ConnectionQueryServices
instance", e);
+ }
+ }
}
-
+
private enum LockMode {
READ, WRITE
};
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
index c6bdadc335..e7a9cd7a22 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java
@@ -105,38 +105,43 @@ public class PhoenixHAAdminTool extends Configured
implements Tool {
return RET_ARGUMENT_ERROR;
}
- if (commandLine.hasOption(HELP_OPT.getOpt())) {
- printUsageMessage();
- return RET_SUCCESS;
- } else if (commandLine.hasOption(LIST_OPT.getOpt())) { // list
- String zkUrl = getLocalZkUrl(getConf()); // Admin is created
against local ZK cluster
- try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl,
getConf(), HighAvailibilityCuratorProvider.INSTANCE)) {
- List<ClusterRoleRecord> records =
admin.listAllClusterRoleRecordsOnZookeeper();
- JacksonUtil.getObjectWriterPretty().writeValue(System.out,
records);
- }
- } else if (commandLine.hasOption(MANIFEST_OPT.getOpt())) { // create
or update
- String fileName =
commandLine.getOptionValue(MANIFEST_OPT.getOpt());
- List<ClusterRoleRecord> records = readRecordsFromFile(fileName);
- boolean forceful = commandLine.hasOption(FORCEFUL_OPT.getOpt());
- Map<String, List<String>> failedHaGroups =
syncClusterRoleRecords(records, forceful);
- if (!failedHaGroups.isEmpty()) {
- System.out.println("Found following HA groups are failing to
write the clusters:");
- failedHaGroups.forEach((k, v) ->
- System.out.printf("%s -> [%s]\n", k, String.join(",",
v)));
- return RET_SYNC_ERROR;
- }
- } else if (commandLine.hasOption(REPAIR_OPT.getOpt())) { // verify
and repair
- String zkUrl = getLocalZkUrl(getConf()); // Admin is created
against local ZK cluster
- try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl,
getConf(), HighAvailibilityCuratorProvider.INSTANCE)) {
- List<String> inconsistentRecord =
admin.verifyAndRepairWithRemoteZnode();
- if (!inconsistentRecord.isEmpty()) {
- System.out.println("Found following inconsistent cluster
role records: ");
- System.out.print(String.join(",", inconsistentRecord));
- return RET_REPAIR_FOUND_INCONSISTENCIES;
+ try {
+ if (commandLine.hasOption(HELP_OPT.getOpt())) {
+ printUsageMessage();
+ return RET_SUCCESS;
+ } else if (commandLine.hasOption(LIST_OPT.getOpt())) { // list
+ String zkUrl = getLocalZkUrl(getConf()); // Admin is created
against local ZK cluster
+ try (PhoenixHAAdminHelper admin = new
PhoenixHAAdminHelper(zkUrl, getConf(),
HighAvailibilityCuratorProvider.INSTANCE)) {
+ List<ClusterRoleRecord> records =
admin.listAllClusterRoleRecordsOnZookeeper();
+ JacksonUtil.getObjectWriterPretty().writeValue(System.out,
records);
+ }
+ } else if (commandLine.hasOption(MANIFEST_OPT.getOpt())) { //
create or update
+ String fileName =
commandLine.getOptionValue(MANIFEST_OPT.getOpt());
+ List<ClusterRoleRecord> records =
readRecordsFromFile(fileName);
+ boolean forceful =
commandLine.hasOption(FORCEFUL_OPT.getOpt());
+ Map<String, List<String>> failedHaGroups =
syncClusterRoleRecords(records, forceful);
+ if (!failedHaGroups.isEmpty()) {
+ System.out.println("Found following HA groups are failing
to write the clusters:");
+ failedHaGroups.forEach((k, v) ->
+ System.out.printf("%s -> [%s]\n", k,
String.join(",", v)));
+ return RET_SYNC_ERROR;
+ }
+ } else if (commandLine.hasOption(REPAIR_OPT.getOpt())) { //
verify and repair
+ String zkUrl = getLocalZkUrl(getConf()); // Admin is created
against local ZK cluster
+ try (PhoenixHAAdminHelper admin = new
PhoenixHAAdminHelper(zkUrl, getConf(),
HighAvailibilityCuratorProvider.INSTANCE)) {
+ List<String> inconsistentRecord =
admin.verifyAndRepairWithRemoteZnode();
+ if (!inconsistentRecord.isEmpty()) {
+ System.out.println("Found following inconsistent
cluster role records: ");
+ System.out.print(String.join(",", inconsistentRecord));
+ return RET_REPAIR_FOUND_INCONSISTENCIES;
+ }
}
}
+ return RET_SUCCESS;
+ } catch(Exception e ) {
+ e.printStackTrace();
+ return -1;
}
- return RET_SUCCESS;
}
/**
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/tool/SchemaTool.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/tool/SchemaTool.java
index 6bc4922f90..f000b98064 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/tool/SchemaTool.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/tool/SchemaTool.java
@@ -66,19 +66,24 @@ public class SchemaTool extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
- populateToolAttributes(args);
- SchemaProcessor processor=null;
- if(Mode.SYNTH.equals(mode)) {
- processor = new SchemaSynthesisProcessor(ddlFile);
- } else if(Mode.EXTRACT.equals(mode)) {
- conf = HBaseConfiguration.addHbaseResources(getConf());
- processor = new SchemaExtractionProcessor(tenantId, conf,
pSchemaName, pTableName);
- } else {
- throw new Exception(mode+" is not accepted, provide [synth or
extract]");
+ try {
+ populateToolAttributes(args);
+ SchemaProcessor processor=null;
+ if(Mode.SYNTH.equals(mode)) {
+ processor = new SchemaSynthesisProcessor(ddlFile);
+ } else if(Mode.EXTRACT.equals(mode)) {
+ conf = HBaseConfiguration.addHbaseResources(getConf());
+ processor = new SchemaExtractionProcessor(tenantId, conf,
pSchemaName, pTableName);
+ } else {
+ throw new Exception(mode+" is not accepted, provide [synth or
extract]");
+ }
+ output = processor.process();
+ LOGGER.info("Effective DDL with " + mode.toString() +": " +
output);
+ return 0;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return -1;
}
- output = processor.process();
- LOGGER.info("Effective DDL with " + mode.toString() +": " + output);
- return 0;
}
public String getOutput() {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 70039aa808..142aea63b7 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -182,7 +182,12 @@ public abstract class AbstractBulkLoadTool extends
Configured implements Tool {
} catch (IllegalStateException e) {
printHelpAndExit(e.getMessage(), getOptions());
}
- return loadData(conf, cmdLine);
+ try {
+ return loadData(conf, cmdLine);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return -1;
+ }
}
@@ -215,85 +220,99 @@ public abstract class AbstractBulkLoadTool extends
Configured implements Tool {
PhoenixTextInputFormat.setSkipHeader(conf);
}
- final Connection conn = QueryUtil.getConnection(conf);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Reading columns from {} :: {}", ((PhoenixConnection)
conn).getURL(),
- qualifiedTableName);
- }
- List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine,
qualifiedTableName);
- Preconditions.checkNotNull(importColumns);
- Preconditions.checkArgument(!importColumns.isEmpty(), "Column info
list is empty");
- FormatToBytesWritableMapper.configureColumnInfoList(conf,
importColumns);
- boolean ignoreInvalidRows =
cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt());
-
conf.setBoolean(FormatToBytesWritableMapper.IGNORE_INVALID_ROW_CONFKEY,
ignoreInvalidRows);
- conf.set(FormatToBytesWritableMapper.TABLE_NAME_CONFKEY,
- SchemaUtil.getEscapedFullTableName(qualifiedTableName));
- // give subclasses their hook
- configureOptions(cmdLine, importColumns, conf);
- String sName = SchemaUtil.normalizeIdentifier(schemaName);
- String tName = SchemaUtil.normalizeIdentifier(tableName);
-
- String tn = SchemaUtil.getEscapedTableName(sName, tName);
- ResultSet rsempty = conn.createStatement().executeQuery("SELECT * FROM
" + tn + " LIMIT 1");
- boolean tableNotEmpty = rsempty.next();
- rsempty.close();
-
- try {
- validateTable(conn, sName, tName);
- } finally {
- conn.close();
- }
-
final String inputPaths =
cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt());
final Path outputPath;
- if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) {
- outputPath = new
Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt()));
- } else {
- outputPath = new Path("/tmp/" + UUID.randomUUID());
- }
-
List<TargetTableRef> tablesToBeLoaded = new
ArrayList<TargetTableRef>();
- PTable table =
conn.unwrap(PhoenixConnection.class).getTable(qualifiedTableName);
- tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName,
table.getPhysicalName().getString()));
boolean hasLocalIndexes = false;
- boolean hasGlobalIndexes = false;
- for(PTable index: table.getIndexes()) {
- if (index.getIndexType() == IndexType.LOCAL) {
- hasLocalIndexes =
- qualifiedIndexTableName == null ? true :
index.getTableName().getString()
- .equals(qualifiedIndexTableName);
- if (hasLocalIndexes && hasGlobalIndexes) break;
+
+ try (Connection conn = QueryUtil.getConnection(conf)) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Reading columns from {} :: {}",
((PhoenixConnection) conn).getURL(),
+ qualifiedTableName);
+ }
+ List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine,
qualifiedTableName);
+ Preconditions.checkNotNull(importColumns);
+ Preconditions.checkArgument(!importColumns.isEmpty(), "Column info
list is empty");
+ FormatToBytesWritableMapper.configureColumnInfoList(conf,
importColumns);
+ boolean ignoreInvalidRows =
cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt());
+
conf.setBoolean(FormatToBytesWritableMapper.IGNORE_INVALID_ROW_CONFKEY,
+ ignoreInvalidRows);
+ conf.set(FormatToBytesWritableMapper.TABLE_NAME_CONFKEY,
+ SchemaUtil.getEscapedFullTableName(qualifiedTableName));
+ // give subclasses their hook
+ configureOptions(cmdLine, importColumns, conf);
+ String sName = SchemaUtil.normalizeIdentifier(schemaName);
+ String tName = SchemaUtil.normalizeIdentifier(tableName);
+
+ String tn = SchemaUtil.getEscapedTableName(sName, tName);
+ ResultSet rsempty =
+ conn.createStatement().executeQuery("SELECT * FROM " + tn
+ " LIMIT 1");
+ boolean tableNotEmpty = rsempty.next();
+ rsempty.close();
+
+ try {
+ validateTable(conn, sName, tName);
+ } finally {
+ conn.close();
}
- if (IndexUtil.isGlobalIndex(index)) {
- hasGlobalIndexes = true;
- if (hasLocalIndexes && hasGlobalIndexes) break;
+
+ if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) {
+ outputPath = new
Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt()));
+ } else {
+ outputPath = new Path("/tmp/" + UUID.randomUUID());
}
- }
- if(hasGlobalIndexes && tableNotEmpty &&
!cmdLine.hasOption(ENABLE_CORRUPT_INDEXES.getOpt())){
- throw new IllegalStateException("Bulk Loading error: Bulk loading
is disabled for non" +
- " empty tables with global indexes, because it will
corrupt the global index table in most cases.\n" +
- "Use the --corruptindexes option to override this check.");
- }
+ PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
+ tablesToBeLoaded.add(
+ new TargetTableRef(qualifiedTableName,
table.getPhysicalName().getString()));
+ boolean hasGlobalIndexes = false;
+ for (PTable index : table.getIndexes()) {
+ if (index.getIndexType() == IndexType.LOCAL) {
+ hasLocalIndexes =
+ qualifiedIndexTableName == null ? true
+ : index.getTableName().getString()
+ .equals(qualifiedIndexTableName);
+ if (hasLocalIndexes && hasGlobalIndexes) {
+ break;
+ }
+ }
+ if (IndexUtil.isGlobalIndex(index)) {
+ hasGlobalIndexes = true;
+ if (hasLocalIndexes && hasGlobalIndexes) {
+ break;
+ }
+ }
+ }
- // using conn after it's been closed... o.O
- tablesToBeLoaded.addAll(getIndexTables(conn, qualifiedTableName));
+ if (hasGlobalIndexes && tableNotEmpty
+ && !cmdLine.hasOption(ENABLE_CORRUPT_INDEXES.getOpt())) {
+ throw new IllegalStateException(
+ "Bulk Loading error: Bulk loading is disabled for non"
+ + " empty tables with global indexes, because
it will corrupt"
+ + " the global index table in most cases.\n"
+ + "Use the --corruptindexes option to override
this check.");
+ }
- // When loading a single index table, check index table name is correct
- if (qualifiedIndexTableName != null){
- TargetTableRef targetIndexRef = null;
- for (TargetTableRef tmpTable : tablesToBeLoaded){
- if
(tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) == 0) {
- targetIndexRef = tmpTable;
- break;
+ // using conn after it's been closed... o.O
+ tablesToBeLoaded.addAll(getIndexTables(conn, qualifiedTableName));
+
+ // When loading a single index table, check index table name is
correct
+ if (qualifiedIndexTableName != null) {
+ TargetTableRef targetIndexRef = null;
+ for (TargetTableRef tmpTable : tablesToBeLoaded) {
+ if (tmpTable.getLogicalName()
+ .compareToIgnoreCase(qualifiedIndexTableName) ==
0) {
+ targetIndexRef = tmpTable;
+ break;
+ }
}
+ if (targetIndexRef == null) {
+ throw new IllegalStateException("Bulk Loader error: index
table "
+ + qualifiedIndexTableName + " doesn't exist");
+ }
+ tablesToBeLoaded.clear();
+ tablesToBeLoaded.add(targetIndexRef);
}
- if (targetIndexRef == null){
- throw new IllegalStateException("Bulk Loader error: index
table " +
- qualifiedIndexTableName + " doesn't exist");
- }
- tablesToBeLoaded.clear();
- tablesToBeLoaded.add(targetIndexRef);
}
return submitJob(conf, tableName, inputPaths, outputPath,
tablesToBeLoaded, hasLocalIndexes);
@@ -449,7 +468,7 @@ public abstract class AbstractBulkLoadTool extends
Configured implements Tool {
*/
private List<TargetTableRef> getIndexTables(Connection conn, String
qualifiedTableName)
throws SQLException {
- PTable table =
conn.unwrap(PhoenixConnection.class).getTable(qualifiedTableName);
+ PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
for(PTable indexTable : table.getIndexes()){
indexTables.add(new
TargetTableRef(indexTable.getName().getString(), indexTable
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
index 2d91bce07e..736780ac6e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
@@ -432,6 +432,7 @@ public class OrphanViewTool extends Configured implements
Tool {
}
} finally {
if (newConn) {
+ // TODO can this be rewritten with try-with-resources ?
tryClosingConnection(tenantConnection);
}
}
@@ -949,6 +950,7 @@ public class OrphanViewTool extends Configured implements
Tool {
ExceptionUtils.getStackTrace(ex));
return -1;
} finally {
+ // TODO use try-with-resources at least for the Connection ?
closeConnectionAndFiles(connection);
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
index 96e322154f..ed72a1e783 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexUpgradeTool.java
@@ -206,9 +206,14 @@ public class IndexUpgradeTool extends Configured
implements Tool {
} catch (IllegalStateException e) {
printHelpAndExit(e.getMessage(), getOptions());
}
- initializeTool(cmdLine);
- prepareToolSetup();
- executeTool();
+ try {
+ initializeTool(cmdLine);
+ prepareToolSetup();
+ executeTool();
+ } catch (Exception e) {
+ e.printStackTrace();
+ hasFailure = true;
+ }
if (hasFailure) {
return -1;
} else {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/schema/stats/UpdateStatisticsTool.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/schema/stats/UpdateStatisticsTool.java
index ffe7ed5f5d..3d9837215b 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/schema/stats/UpdateStatisticsTool.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/schema/stats/UpdateStatisticsTool.java
@@ -96,13 +96,18 @@ public class UpdateStatisticsTool extends Configured
implements Tool {
@Override
public int run(String[] args) throws Exception {
- parseArgs(args);
- preJobTask();
- configureJob();
- TableMapReduceUtil.initCredentials(job);
- int ret = runJob();
- postJobTask();
- return ret;
+ try {
+ parseArgs(args);
+ preJobTask();
+ configureJob();
+ TableMapReduceUtil.initCredentials(job);
+ int ret = runJob();
+ postJobTask();
+ return ret;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return -1;
+ }
}
/**
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/util/MergeViewIndexIdSequencesTool.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/util/MergeViewIndexIdSequencesTool.java
index 0bb0fb8549..3eb62ea086 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/util/MergeViewIndexIdSequencesTool.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/util/MergeViewIndexIdSequencesTool.java
@@ -91,25 +91,16 @@ public class MergeViewIndexIdSequencesTool extends
Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
int status = 0;
- PhoenixConnection conn = null;
- try {
- parseOptions(args);
-
- final Configuration config =
HBaseConfiguration.addHbaseResources(getConf());
-
- conn = ConnectionUtil.getInputConnection(config).
- unwrap(PhoenixConnection.class);
+ parseOptions(args);
+ final Configuration config =
HBaseConfiguration.addHbaseResources(getConf());
+ try (PhoenixConnection conn =
ConnectionUtil.getInputConnection(config).
+ unwrap(PhoenixConnection.class)) {
UpgradeUtil.mergeViewIndexIdSequences(conn);
-
} catch (Exception e) {
LOGGER.error("Get an error while running
MergeViewIndexIdSequencesTool, "
+ e.getMessage());
status = 1;
- } finally {
- if (conn != null) {
- conn.close();
- }
}
return status;
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 4d45568b9f..86acbe1df3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -37,8 +37,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.phoenix.end2end.index.IndexTestUtil;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.CsvBulkLoadTool;
@@ -151,12 +149,10 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
"--table", "table1",
"--schema", "s",
"--zookeeper", zkQuorum});
- fail("Bulk loading error should have happened earlier");
- } catch (Exception e){
- assertTrue(e.getMessage().contains("Bulk Loading error: Bulk
loading is disabled for " +
- "non empty tables with global indexes, because it will
corrupt " +
- "the global index table in most cases.\n" +
- "Use the --corruptindexes option to override this
check."));
+ assertTrue("Bulk loading error should have happened earlier",
exitCode != 0);
+ } catch (Exception e) {
+ fail("Tools should return non-zero exit codes on failure"
+ + " instead of throwing an exception");
}
ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM s.table1
ORDER BY id");
@@ -398,7 +394,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
+ " (FIRST_NAME ASC)"
+ " INCLUDE (LAST_NAME)";
stmt.execute(ddl);
-
+
FileSystem fs = FileSystem.get(getUtility().getConfiguration());
FSDataOutputStream outputStream = fs.create(new
Path("/tmp/input3.csv"));
PrintWriter printWriter = new PrintWriter(outputStream);
@@ -598,17 +594,17 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
csvBulkLoadTool.setConf(getUtility().getConfiguration());
try {
- csvBulkLoadTool.run(new String[] {
+ int exitCode = csvBulkLoadTool.run(new String[] {
"--input", "/tmp/input4.csv",
"--table", tableName,
"--zookeeper", zkQuorum });
- fail(String.format("Table %s not created, hence should
fail",tableName));
+ assertTrue(String.format("Table %s not created, hence should
fail", tableName),
+ exitCode != 0);
} catch (Exception ex) {
- assertTrue(ex instanceof IllegalArgumentException);
- assertTrue(ex.getMessage().contains(String.format("Table %s not
found", tableName)));
- }
+ fail("Tools should return non-zero exit codes on failure"
+ + " instead of throwing an exception"); }
}
-
+
@Test
public void testAlreadyExistsOutputPath() {
String tableName = "TABLE9";
@@ -617,7 +613,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL
PRIMARY KEY, "
+ "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
-
+
FileSystem fs = FileSystem.get(getUtility().getConfiguration());
fs.create(new Path(outputPath));
FSDataOutputStream outputStream = fs.create(new
Path("/tmp/input9.csv"));
@@ -625,18 +621,21 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
printWriter.println("1,FirstName 1,LastName 1");
printWriter.println("2,FirstName 2,LastName 2");
printWriter.close();
-
+
CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
csvBulkLoadTool.setConf(getUtility().getConfiguration());
- csvBulkLoadTool.run(new String[] {
+ int exitCode = csvBulkLoadTool.run(new String[] {
"--input", "/tmp/input9.csv",
"--output", outputPath,
"--table", tableName,
"--zookeeper", zkQuorum });
-
- fail(String.format("Output path %s already exists. hence, should
fail",outputPath));
+
+ assertTrue(
+ String.format("Output path %s already exists. hence, should
fail", outputPath),
+ exitCode != 0);
} catch (Exception ex) {
- assertTrue(ex instanceof FileAlreadyExistsException);
+ fail("Tools should return non-zero exit codes when fail,"
+ + " instead of throwing an exception");
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
index 5c66e125f7..e96f97cd32 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
@@ -33,7 +33,6 @@ import java.sql.Statement;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.phoenix.mapreduce.RegexBulkLoadTool;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -305,15 +304,16 @@ public class RegexBulkLoadToolIT extends BaseOwnClusterIT
{
RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
regexBulkLoadTool.setConf(getUtility().getConfiguration());
try {
- regexBulkLoadTool.run(new String[] {
+ int exitCode = regexBulkLoadTool.run(new String[] {
"--input", "/tmp/input4.csv",
"--table", tableName,
"--regex", "([^,]*),([^,]*),([^,]*)",
"--zookeeper", zkQuorum });
- fail(String.format("Table %s not created, hence should
fail",tableName));
+ assertTrue(String.format("Table %s not created, hence should
fail", tableName),
+ exitCode != 0);
} catch (Exception ex) {
- assertTrue(ex instanceof IllegalArgumentException);
- assertTrue(ex.getMessage().contains(String.format("Table %s not
found", tableName)));
+ fail("Tools should return non-zero exit codes on failure"
+ + " instead of throwing an exception");
}
}
@@ -336,16 +336,18 @@ public class RegexBulkLoadToolIT extends BaseOwnClusterIT
{
RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
regexBulkLoadTool.setConf(getUtility().getConfiguration());
- regexBulkLoadTool.run(new String[] {
+ int exitCode = regexBulkLoadTool.run(new String[] {
"--input", "/tmp/input9.csv",
"--output", outputPath,
"--table", tableName,
"--regex", "([^,]*),([^,]*),([^,]*)",
"--zookeeper", zkQuorum });
-
- fail(String.format("Output path %s already exists. hence, should
fail",outputPath));
+ assertTrue(
+ String.format("Output path %s already exists. hence, should
fail", outputPath),
+ exitCode != 0);
} catch (Exception ex) {
- assertTrue(ex instanceof FileAlreadyExistsException);
+ fail("Tools should return non-zero exit codes on failure"
+ + " instead of throwing an exception");
}
}