Repository: tajo Updated Branches: refs/heads/index_support e38c48aa8 -> 347571584
TAJO-1135: Implement queryable virtual table for cluster information Closes #366 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6e519bcf Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6e519bcf Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6e519bcf Branch: refs/heads/index_support Commit: 6e519bcf39a526b64e62d9957404bfd8a3888486 Parents: 1617fa9 Author: Jihun Kang <[email protected]> Authored: Fri Mar 6 16:32:37 2015 +0900 Committer: Jihun Kang <[email protected]> Committed: Fri Mar 6 16:32:37 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../dictionary/ClusterTableDescriptor.java | 56 ++++++++ .../InfoSchemaMetadataDictionary.java | 5 +- .../NonForwardQueryResultSystemScanner.java | 142 +++++++++++++++++++ .../TestNonForwardQueryResultSystemScanner.java | 18 +++ 5 files changed, 222 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 1570e42..974e4d7 100644 --- a/CHANGES +++ b/CHANGES @@ -4,6 +4,8 @@ Release 0.11.0 - unreleased NEW FEATURES + TAJO-1135: Implement queryable virtual table for cluster information. + (jihun) IMPROVEMENT http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java new file mode 100644 index 0000000..e3c830f --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.dictionary; + +import org.apache.tajo.common.TajoDataTypes.Type; + +class ClusterTableDescriptor extends AbstractTableDescriptor { + + private static final String TABLENAME = "cluster"; + private final ColumnDescriptor[] columns = new ColumnDescriptor[] { + new ColumnDescriptor("host", Type.TEXT, 0), + new ColumnDescriptor("port", Type.INT4, 0), + new ColumnDescriptor("type", Type.TEXT, 0), + new ColumnDescriptor("status", Type.TEXT, 0), + new ColumnDescriptor("total_cpu", Type.INT4, 0), + new ColumnDescriptor("used_mem", Type.INT8, 0), + new ColumnDescriptor("total_mem", Type.INT8, 0), + new ColumnDescriptor("free_heap", Type.INT8, 0), + new ColumnDescriptor("max_heap", Type.INT8, 0), + new ColumnDescriptor("used_diskslots", Type.FLOAT4, 0), + new ColumnDescriptor("total_diskslots", Type.FLOAT4, 0), + new ColumnDescriptor("running_tasks", Type.INT4, 0), + new ColumnDescriptor("last_heartbeat_ts", Type.TIMESTAMP, 0) + }; + + public ClusterTableDescriptor(InfoSchemaMetadataDictionary metadataDictionary) { + super(metadataDictionary); + } + + @Override + public String getTableNameString() { + return TABLENAME; + } + + @Override + protected ColumnDescriptor[] getColumnDescriptors() { + return columns; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java index de79caa..0ac0a54 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.tajo.catalog.exception.NoSuchTableException; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.util.TUtil; public class InfoSchemaMetadataDictionary { @@ -39,6 +40,7 @@ public class InfoSchemaMetadataDictionary { TABLEOPTIONS, TABLESTATS, PARTITIONS, + CLUSTER, MAX_TABLE; } @@ -58,6 +60,7 @@ public class InfoSchemaMetadataDictionary { schemaInfoTableDescriptors.set(DEFINED_TABLES.TABLEOPTIONS.ordinal(), new TableOptionsTableDescriptor(this)); schemaInfoTableDescriptors.set(DEFINED_TABLES.TABLESTATS.ordinal(), new TableStatsTableDescriptor(this)); schemaInfoTableDescriptors.set(DEFINED_TABLES.PARTITIONS.ordinal(), new PartitionsTableDescriptor(this)); + schemaInfoTableDescriptors.set(DEFINED_TABLES.CLUSTER.ordinal(), new ClusterTableDescriptor(this)); } public boolean isSystemDatabase(String databaseName) { @@ -119,6 +122,6 @@ public class InfoSchemaMetadataDictionary { } protected String getTablePath() { - return "SYSTEM"; + return StoreType.SYSTEM.name().toUpperCase(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index 93909d1..e44d8be 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -20,7 +20,10 @@ package org.apache.tajo.master.exec; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.Stack; import org.apache.commons.logging.Log; @@ -56,6 +59,8 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.master.rm.Worker; +import org.apache.tajo.master.rm.WorkerResource; import org.apache.tajo.plan.InvalidQueryException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.PlanningException; @@ -431,6 +436,141 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult return tuples; } + private Tuple getQueryMasterTuple(Schema outSchema, Worker aWorker) { + List<Column> columns = outSchema.getColumns(); + Tuple aTuple = new VTuple(outSchema.size()); + WorkerResource aResource = aWorker.getResource(); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + + if ("host".equalsIgnoreCase(column.getSimpleName())) { + if (aWorker.getConnectionInfo() != null && aWorker.getConnectionInfo().getHost() != null) { + aTuple.put(fieldId, DatumFactory.createText(aWorker.getConnectionInfo().getHost())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("port".equalsIgnoreCase(column.getSimpleName())) { + if (aWorker.getConnectionInfo() != null) { + aTuple.put(fieldId, DatumFactory.createInt4(aWorker.getConnectionInfo().getQueryMasterPort())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("type".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText("QueryMaster")); + } else if ("status".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(aWorker.getState().toString())); + } else if ("RUNNING".equalsIgnoreCase(aWorker.getState().toString())) { + if ("running_tasks".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(aResource.getNumQueryMasterTasks())); + } else if ("free_heap".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt8(aResource.getFreeHeap())); + } else if ("max_heap".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMaxHeap())); + } else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) { + if (aWorker.getLastHeartbeatTime() > 0) { + aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aWorker.getLastHeartbeatTime())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } + + return aTuple; + } + + private Tuple getWorkerTuple(Schema outSchema, Worker aWorker) { + List<Column> columns = outSchema.getColumns(); + Tuple aTuple = new VTuple(outSchema.size()); + WorkerResource aResource = aWorker.getResource(); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + + if ("host".equalsIgnoreCase(column.getSimpleName())) { + if (aWorker.getConnectionInfo() != null && aWorker.getConnectionInfo().getHost() != null) { + aTuple.put(fieldId, DatumFactory.createText(aWorker.getConnectionInfo().getHost())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("port".equalsIgnoreCase(column.getSimpleName())) { + if (aWorker.getConnectionInfo() != null) { + aTuple.put(fieldId, DatumFactory.createInt4(aWorker.getConnectionInfo().getPeerRpcPort())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } else if ("type".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText("Worker")); + } else if ("status".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(aWorker.getState().toString())); + } else if ("RUNNING".equalsIgnoreCase(aWorker.getState().toString())) { + if ("total_cpu".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(aResource.getCpuCoreSlots())); + } else if ("used_mem".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt8(aResource.getUsedMemoryMB()*1048576l)); + } else if ("total_mem".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMemoryMB()*1048576l)); + } else if ("free_heap".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt8(aResource.getFreeHeap())); + } else if ("max_heap".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMaxHeap())); + } else if ("used_diskslots".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createFloat4(aResource.getUsedDiskSlots())); + } else if ("total_diskslots".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createFloat4(aResource.getDiskSlots())); + } else if ("running_tasks".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(aResource.getNumRunningTasks())); + } else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) { + if (aWorker.getLastHeartbeatTime() > 0) { + aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aWorker.getLastHeartbeatTime())); + } else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } + } + else { + aTuple.put(fieldId, DatumFactory.createNullDatum()); + } + } + + return aTuple; + } + + private List<Tuple> getClusterInfo(Schema outSchema) { + Map<Integer, Worker> workerMap = masterContext.getResourceManager().getWorkers(); + Set<Integer> keySet = workerMap.keySet(); + List<Tuple> tuples = Collections.emptyList(); + List<Worker> queryMasterList = new ArrayList<Worker>(); + List<Worker> workerList = new ArrayList<Worker>(); + + for (Integer keyId: keySet) { + Worker aWorker = workerMap.get(keyId); + WorkerResource aResource = aWorker.getResource(); + + if (aResource.isQueryMasterMode()) { + queryMasterList.add(aWorker); + } + + if (aResource.isTaskRunnerMode()) { + workerList.add(aWorker); + } + } + + tuples = new ArrayList<Tuple>(queryMasterList.size() + workerList.size()); + for (Worker queryMaster: queryMasterList) { + tuples.add(getQueryMasterTuple(outSchema, queryMaster)); + } + + for (Worker worker: workerList) { + tuples.add(getWorkerTuple(outSchema, worker)); + } + + return tuples; + } + private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) { List<Tuple> tuples = null; String tableName = CatalogUtil.extractSimpleName(tableDesc.getName()); @@ -451,6 +591,8 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult tuples = getAllTableStats(inSchema); } else if ("partitions".equalsIgnoreCase(tableName)) { tuples = getAllPartitions(inSchema); + } else if ("cluster".equalsIgnoreCase(tableName)) { + tuples = getClusterInfo(inSchema); } return tuples; http://git-wip-us.apache.org/repos/asf/tajo/blob/6e519bcf/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java index fa7fdf0..01d4ec4 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java @@ -295,4 +295,22 @@ public class TestNonForwardQueryResultSystemScanner { assertThat(tuples.size(), is(9)); assertThat(tuples, hasItem(getTupleMatcher(0, is("lineitem")))); } + + @Test + public void testGetClusterDetails() throws Exception { + NonForwardQueryResultScanner queryResultScanner = + getScanner("SELECT TYPE FROM INFORMATION_SCHEMA.CLUSTER"); + + queryResultScanner.init(); + + List<ByteString> rowBytes = queryResultScanner.getNextRows(100); + + assertThat(rowBytes.size(), is(2)); + + RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema()); + List<Tuple> tuples = getTupleList(decoder, rowBytes); + + assertThat(tuples.size(), is(2)); + assertThat(tuples, hasItem(getTupleMatcher(0, is("QueryMaster")))); + } }
