http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/AvroSchemaParser.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/AvroSchemaParser.java b/fe/src/main/java/com/cloudera/impala/util/AvroSchemaParser.java deleted file mode 100644 index 60b0c7a..0000000 --- a/fe/src/main/java/com/cloudera/impala/util/AvroSchemaParser.java +++ /dev/null @@ -1,204 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.util; - -import static org.apache.avro.Schema.Type.BOOLEAN; -import static org.apache.avro.Schema.Type.DOUBLE; -import static org.apache.avro.Schema.Type.FLOAT; -import static org.apache.avro.Schema.Type.INT; -import static org.apache.avro.Schema.Type.LONG; -import static org.apache.avro.Schema.Type.STRING; - -import java.util.Collections; -import java.util.Hashtable; -import java.util.List; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.avro.SchemaParseException; -import org.codehaus.jackson.JsonNode; - -import com.cloudera.impala.analysis.ColumnDef; -import com.cloudera.impala.analysis.TypeDef; -import com.cloudera.impala.catalog.ArrayType; -import com.cloudera.impala.catalog.MapType; -import com.cloudera.impala.catalog.ScalarType; -import com.cloudera.impala.catalog.StructField; -import com.cloudera.impala.catalog.StructType; -import com.cloudera.impala.catalog.Type; -import com.cloudera.impala.common.AnalysisException; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Utility class used to parse Avro schema. Checks that the schema is valid - * and performs mapping of Avro types to Impala types. - * Note: This code is loosely based off the parsing code in the Hive AvroSerDe. - */ -public class AvroSchemaParser { - // Map of Avro to Impala primitive types. - private static final Map<Schema.Type, Type> avroToImpalaPrimitiveTypeMap_; - static { - Map<Schema.Type, Type> typeMap = new Hashtable<Schema.Type, Type>(); - typeMap.put(STRING, Type.STRING); - typeMap.put(INT, Type.INT); - typeMap.put(BOOLEAN, Type.BOOLEAN); - typeMap.put(LONG, Type.BIGINT); - typeMap.put(FLOAT, Type.FLOAT); - typeMap.put(DOUBLE, Type.DOUBLE); - avroToImpalaPrimitiveTypeMap_ = Collections.unmodifiableMap(typeMap); - } - - /** - * Parses the Avro schema string literal, mapping the Avro types to Impala types. - * Returns a list of ColumnDef objects with their name and type info set. - * Throws an AnalysisException if the Avro type maps to a type that Impala - * does not yet support. - * Throws a SchemaParseException if the Avro schema was invalid. - */ - public static List<ColumnDef> parse(String schemaStr) - throws SchemaParseException, AnalysisException { - Schema.Parser avroSchemaParser = new Schema.Parser(); - Schema schema = avroSchemaParser.parse(schemaStr); - if (!schema.getType().equals(Schema.Type.RECORD)) { - throw new UnsupportedOperationException("Schema for table must be of type " + - "RECORD. Received type: " + schema.getType()); - } - List<ColumnDef> colDefs = Lists.newArrayListWithCapacity(schema.getFields().size()); - for (Schema.Field field: schema.getFields()) { - ColumnDef colDef = new ColumnDef(field.name(), - new TypeDef(getTypeInfo(field.schema(), field.name())), field.doc()); - colDef.analyze(); - colDefs.add(colDef); - } - return colDefs; - } - - /** - * Parses the given Avro schema and returns the matching Impala type - * for this field. Handles primitive and complex types. - */ - private static Type getTypeInfo(Schema schema, String colName) - throws AnalysisException { - // Avro requires NULLable types to be defined as unions of some type T - // and NULL. This is annoying and we're going to hide it from the user. - if (isNullableType(schema)) { - return getTypeInfo(getColumnType(schema), colName); - } - - Schema.Type type = schema.getType(); - if (avroToImpalaPrimitiveTypeMap_.containsKey(type)) { - return avroToImpalaPrimitiveTypeMap_.get(type); - } - - switch(type) { - case ARRAY: - Type itemType = getTypeInfo(schema.getElementType(), colName); - return new ArrayType(itemType); - case MAP: - Type valueType = getTypeInfo(schema.getValueType(), colName); - return new MapType(Type.STRING, valueType); - case RECORD: - StructType structType = new StructType(); - for (Schema.Field field: schema.getFields()) { - Type fieldType = getTypeInfo(field.schema(), colName); - structType.addField(new StructField(field.name(), fieldType, field.doc())); - } - return structType; - case BYTES: - // Decimal is stored in Avro as a BYTE. - Type decimalType = getDecimalType(schema); - if (decimalType != null) return decimalType; - // TODO: Add support for stored Avro UNIONs by exposing them as STRUCTs in Impala. - case UNION: - case ENUM: - case FIXED: - case NULL: - default: { - throw new AnalysisException(String.format( - "Unsupported type '%s' of column '%s'", type.getName(), colName)); - } - } - } - - /** - * Returns true if this is a nullable type (a Union[T, Null]), false otherwise. - */ - private static boolean isNullableType(Schema schema) { - // [null, null] not allowed, so this check is ok. - return schema.getType().equals(Schema.Type.UNION) && schema.getTypes().size() == 2 && - (schema.getTypes().get(0).getType().equals(Schema.Type.NULL) || - schema.getTypes().get(1).getType().equals(Schema.Type.NULL)); - } - - /** - * If a nullable type, get the schema for the non-nullable type which will - * provide Impala column type information. - */ - private static Schema getColumnType(Schema schema) { - List<Schema> types = schema.getTypes(); - return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0); - } - - /** - * Attempts to parse decimal type information from the Avro schema, returning - * a decimal ColumnType if successful or null if this schema does not map - * to a decimal type. - * Decimal is defined in Avro as a BYTE type with the logicalType property - * set to "decimal" and a specified scale/precision. - * Throws a SchemaParseException if the logicType=decimal, but scale/precision - * is not specified or in the incorrect format. - */ - private static Type getDecimalType(Schema schema) { - Preconditions.checkState(schema.getType() == Schema.Type.BYTES); - String logicalType = schema.getProp("logicalType"); - if (logicalType != null && logicalType.equalsIgnoreCase("decimal")) { - // Parse the scale/precision of the decimal type. - Integer scale = getDecimalProp(schema, "scale"); - // The Avro spec states that scale should default to zero if not set. - if (scale == null) scale = 0; - - // Precision is a required property according to the Avro spec. - Integer precision = getDecimalProp(schema, "precision"); - if (precision == null) { - throw new SchemaParseException( - "No 'precision' property specified for 'decimal' logicalType"); - } - return ScalarType.createDecimalType(precision, scale); - } - return null; - } - - /** - * Parses a decimal property and returns the value as an integer, or null - * if the property isn't set. Used to parse decimal scale/precision. - * Throws a SchemaParseException if the property doesn't parse to a - * natural number. - */ - private static Integer getDecimalProp(Schema schema, String propName) - throws SchemaParseException { - JsonNode node = schema.getJsonProp(propName); - if (node == null) return null; - int propValue = node.getValueAsInt(-1); - if (propValue < 0) { - throw new SchemaParseException(String.format("Invalid decimal '%s' " + - "property value: %s", propName, node.getValueAsText())); - } - return propValue; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java b/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java deleted file mode 100644 index f86c347..0000000 --- a/fe/src/main/java/com/cloudera/impala/util/AvroSchemaUtils.java +++ /dev/null @@ -1,189 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.util; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.util.List; -import java.util.Map; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; - -import com.cloudera.impala.analysis.ColumnDef; -import com.cloudera.impala.catalog.PrimitiveType; -import com.cloudera.impala.common.AnalysisException; -import com.cloudera.impala.common.FileSystemUtil; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - -/** - * Contains utility functions for dealing with Avro schemas. - */ -public class AvroSchemaUtils { - - /** - * Gets an Avro table's JSON schema from the list of given table property search - * locations. The schema may be specified as a string literal or provided as a - * Hadoop FileSystem or http URL that points to the schema. Apart from ensuring - * that the JSON schema is not SCHEMA_NONE, this function does not perform any - * additional validation on the returned string (e.g., it may not be a valid - * schema). Returns the Avro schema or null if none was specified in the search - * locations. Throws an AnalysisException if a schema was specified, but could not - * be retrieved, e.g., because of an invalid URL. - */ - public static String getAvroSchema(List<Map<String, String>> schemaSearchLocations) - throws AnalysisException { - String url = null; - // Search all locations and break out on the first valid schema found. - for (Map<String, String> schemaLocation: schemaSearchLocations) { - if (schemaLocation == null) continue; - - String literal = - schemaLocation.get( - AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()); - if (literal != null && !literal.equals(AvroSerdeUtils.SCHEMA_NONE)) return literal; - - url = schemaLocation.get( - AvroSerdeUtils.AvroTableProperties.SCHEMA_URL.getPropName()); - if (url != null && !url.equals(AvroSerdeUtils.SCHEMA_NONE)) { - url = url.trim(); - break; - } - } - if (url == null) return null; - - String schema = null; - InputStream urlStream = null; - try { - // TODO: Add support for https:// here. - if (url.toLowerCase().startsWith("http://")) { - urlStream = new URL(url).openStream(); - schema = IOUtils.toString(urlStream); - } else { - Path path = new Path(url); - FileSystem fs = null; - fs = path.getFileSystem(FileSystemUtil.getConfiguration()); - if (!fs.exists(path)) { - throw new AnalysisException(String.format( - "Invalid avro.schema.url: %s. Path does not exist.", url)); - } - schema = FileSystemUtil.readFile(path); - } - } catch (AnalysisException e) { - throw e; - } catch (IOException e) { - throw new AnalysisException(String.format( - "Failed to read Avro schema at: %s. %s ", url, e.getMessage())); - } catch (Exception e) { - throw new AnalysisException(String.format( - "Invalid avro.schema.url: %s. %s", url, e.getMessage())); - } finally { - if (urlStream != null) IOUtils.closeQuietly(urlStream); - } - return schema; - } - - /** - * Reconciles differences in names/types between the given list of column definitions - * and the column definitions corresponding to an Avro Schema. Populates 'warning' - * if there are inconsistencies between the column definitions and the Avro schema, - * Returns the reconciled column definitions according to the following conflict - * resolution policy: - * - * Mismatched number of columns -> Prefer Avro columns. - * Always prefer Avro schema except for column type CHAR/VARCHAR/STRING: - * A CHAR/VARCHAR/STRING column definition maps to an Avro STRING. The reconciled - * column will preserve the type in the column definition but use the column name - * and comment from the Avro schema. - */ - public static List<ColumnDef> reconcileSchemas( - List<ColumnDef> colDefs, List<ColumnDef> avroCols, StringBuilder warning) { - if (colDefs.size() != avroCols.size()) { - warning.append(String.format( - "Ignoring column definitions in favor of Avro schema.\n" + - "The Avro schema has %s column(s) but %s column definition(s) were given.", - avroCols.size(), colDefs.size())); - return avroCols; - } - - List<ColumnDef> result = Lists.newArrayListWithCapacity(colDefs.size()); - for (int i = 0; i < avroCols.size(); ++i) { - ColumnDef colDef = colDefs.get(i); - ColumnDef avroCol = avroCols.get(i); - Preconditions.checkNotNull(colDef.getType()); - Preconditions.checkNotNull(avroCol.getType()); - - // A CHAR/VARCHAR/STRING column definition maps to an Avro STRING, and is preserved - // as a CHAR/VARCHAR/STRING in the reconciled schema. Column name and comment - // are taken from the Avro schema. - if ((colDef.getType().isStringType() && avroCol.getType().isStringType())) { - Preconditions.checkState( - avroCol.getType().getPrimitiveType() == PrimitiveType.STRING); - ColumnDef reconciledColDef = new ColumnDef( - avroCol.getColName(), colDef.getTypeDef(), avroCol.getComment()); - try { - reconciledColDef.analyze(); - } catch (AnalysisException e) { - Preconditions.checkNotNull( - null, "reconciledColDef.analyze() should never throw."); - } - result.add(reconciledColDef); - } else { - result.add(avroCol); - } - - // Populate warning string if there are name and/or type inconsistencies. - if (!colDef.getColName().equals(avroCol.getColName()) || - !colDef.getType().equals(avroCol.getType())) { - if (warning.length() == 0) { - // Add warning preamble for the first mismatch. - warning.append("Resolved the following name and/or type inconsistencies " + - "between the column definitions and the Avro schema.\n"); - } - warning.append(String.format("Column definition at position %s: %s %s\n", - i, colDefs.get(i).getColName(), colDefs.get(i).getType().toSql())); - warning.append(String.format("Avro schema column at position %s: %s %s\n", - i, avroCols.get(i).getColName(), avroCols.get(i).getType().toSql())); - warning.append(String.format("Resolution at position %s: %s %s\n", - i, result.get(i).getColName(), result.get(i).getType().toSql())); - } - } - Preconditions.checkState(result.size() == avroCols.size()); - Preconditions.checkState(result.size() == colDefs.size()); - return result; - } - - /** - * Sets the comment of each column definition to 'from deserializer' if not already - * set. The purpose of this function is to provide behavioral consistency with - * Hive ('deserializer' is not applicable to Impala) with respect to column comments - * set for Avro tables. - */ - public static void setFromSerdeComment(List<ColumnDef> colDefs) { - for (ColumnDef colDef: colDefs) { - if (Strings.isNullOrEmpty(colDef.getComment())) { - colDef.setComment("from deserializer"); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/DisjointSet.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/DisjointSet.java b/fe/src/main/java/com/cloudera/impala/util/DisjointSet.java deleted file mode 100644 index bce214e..0000000 --- a/fe/src/main/java/com/cloudera/impala/util/DisjointSet.java +++ /dev/null @@ -1,142 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.util; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -/** - * Basic implementation of the disjoint-set data structure. - * Stores a set of disjoint item sets and provides efficient implementations of mainly - * two operations: - * 1. Find the item set corresponding to a given member item (get() function) - * 2. Compute the union of two item sets (union() function) - */ -public class DisjointSet<T> { - // Maps from an item to its item set. - private final Map<T, Set<T>> itemSets_ = Maps.newHashMap(); - private final Set<Set<T>> uniqueSets_ = Sets.newHashSet(); - - /** - * Returns the item set corresponding to the given item or null if it - * doesn't exist. - */ - public Set<T> get(T item) { return itemSets_.get(item); } - - public Set<Set<T>> getSets() { return uniqueSets_; } - - /** - * Registers a new item set with a single item. Returns the new item set. - * Throws if such an item set already exists. - */ - public Set<T> makeSet(T item) { - if (itemSets_.containsKey(item)) { - throw new IllegalStateException( - "Item set for item already exists: " + item.toString()); - } - Set<T> s = Sets.newHashSet(item); - itemSets_.put(item, s); - uniqueSets_.add(s); - return s; - } - - /** - * Merges the two item sets belonging to the members a and b. The merged set contains - * at least a and b even if a or b did not have an associated item set. - * Returns false if the item sets of a and b are non-empty and already identical, - * true otherwise. - */ - public boolean union(T a, T b) { - Set<T> aItems = itemSets_.get(a); - Set<T> bItems = itemSets_.get(b); - // check if the sets are already identical - if (aItems != null && bItems != null && aItems == bItems) return false; - - // union(x, x) is equivalent to makeSet(x) - if (a.equals(b) && aItems == null) { - makeSet(a); - return true; - } - - // create sets for a or b if not present already - if (aItems == null) aItems = makeSet(a); - if (bItems == null) bItems = makeSet(b); - - // will contain the union of aItems and bItems - Set<T> mergedItems = aItems; - // always the smaller of the two sets to be merged - Set<T> updateItems = bItems; - if (bItems.size() > aItems.size()) { - mergedItems = bItems; - updateItems = aItems; - } - for (T item: updateItems) { - mergedItems.add(item); - itemSets_.put(item, mergedItems); - } - uniqueSets_.remove(updateItems); - return true; - } - - /** - * Merges all the item sets corresponding to the given items. Returns true if any item - * sets were merged or created, false otherwise (item sets are already identical). - */ - public boolean bulkUnion(Collection<T> items) { - if (items.isEmpty()) return false; - Iterator<T> it = items.iterator(); - T head = it.next(); - // bulkUnion(x) is equivalent to makeSet(x) - if (!it.hasNext()) { - if (get(head) != null) return false; - makeSet(head); - return true; - } - boolean result = false; - while(it.hasNext()) { - boolean changed = union(head, it.next()); - result = result || changed; - } - return result; - } - - /** - * Checks the internal consistency of this data structure. - * Throws an IllegalStateException if an inconsistency is detected. - */ - public void checkConsistency() { - Set<Set<T>> validatedSets = Sets.newHashSet(); - for (Set<T> itemSet: itemSets_.values()) { - // Avoid checking the same item set multiple times. - if (validatedSets.contains(itemSet)) continue; - // Validate that all items in this set are properly mapped to - // the set itself. - for (T item: itemSet) { - if (itemSet != itemSets_.get(item)) { - throw new IllegalStateException("DisjointSet is in an inconsistent state."); - } - } - validatedSets.add(itemSet); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/EventSequence.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/EventSequence.java b/fe/src/main/java/com/cloudera/impala/util/EventSequence.java deleted file mode 100644 index 6b12c2e..0000000 --- a/fe/src/main/java/com/cloudera/impala/util/EventSequence.java +++ /dev/null @@ -1,58 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.util; - -import java.util.List; - -import com.cloudera.impala.thrift.TEventSequence; - -import com.google.common.collect.Lists; - -/** - * Wrapper around TEventSequence so that we can mark events with a single method call. - * Events are 'marked' as they happen (so in order, with no time-travel backwards). - */ -public class EventSequence { - private final List<Long> timestamps_ = Lists.newArrayList(); - private final List<String> labels_ = Lists.newArrayList(); - - private final long startTime_; - private final String name_; - - public EventSequence(String name) { - name_ = name; - startTime_ = System.nanoTime(); - } - - /** - * Saves an event at the current time with the given label. - */ - public void markEvent(String label) { - // Timestamps should be in ns resolution - timestamps_.add(System.nanoTime() - startTime_); - labels_.add(label); - } - - public TEventSequence toThrift() { - TEventSequence ret = new TEventSequence(); - ret.timestamps = timestamps_; - ret.labels = labels_; - ret.name = name_; - return ret; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/FileWatchService.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/FileWatchService.java b/fe/src/main/java/com/cloudera/impala/util/FileWatchService.java deleted file mode 100644 index 88a456d..0000000 --- a/fe/src/main/java/com/cloudera/impala/util/FileWatchService.java +++ /dev/null @@ -1,140 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.util; - -import java.io.File; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * Service to watch a file for changes. A thread periodically checks the file - * modification time and uses the provided {@link FileChangeListener} to notify - * a consumer. - */ -public class FileWatchService { - final static Logger LOG = LoggerFactory.getLogger(FileWatchService.class); - - // Default time to wait between checking the file. - static final long DEFAULT_CHECK_INTERVAL_MS = 10 * 1000; - - // Time between checking for changes. Mutable for unit tests. - private long checkIntervalMs_ = DEFAULT_CHECK_INTERVAL_MS; - - // Future returned by scheduleAtFixedRate(), needed to stop the checking thread. - private ScheduledFuture<?> fileCheckFuture_; - - private final AtomicBoolean running_; - private final FileChangeListener changeListener_; // Used to notify when changes occur. - private final File file_; // The file to check for changes. - private boolean alreadyWarned_; // Avoid repeatedly warning if the file is missing - private long prevChange_; // Time of the last observed change - - /** - * Listener used to notify of file changes. - */ - public interface FileChangeListener { - - /** - * Called when the file changes. - */ - void onFileChange(); - } - - public FileWatchService(File file, FileChangeListener listener) { - Preconditions.checkNotNull(file); - Preconditions.checkNotNull(listener); - Preconditions.checkArgument(file.exists()); - running_ = new AtomicBoolean(false); - file_ = file; - changeListener_ = listener; - prevChange_ = 0L; - alreadyWarned_ = false; - } - - /** - * Set the time (in milliseconds) to wait between checking the file for changes. - * Only used in tests. - */ - @VisibleForTesting - public void setCheckIntervalMs(long checkIntervalMs) { - checkIntervalMs_ = checkIntervalMs; - } - - /** - * Checks if the file has changed since the last observed change and if so, - * notifies the listener. - */ - private void checkFile() { - if (file_.exists()) { - long lastChange = file_.lastModified(); - if (lastChange > prevChange_) { - changeListener_.onFileChange(); - prevChange_ = lastChange; - alreadyWarned_ = false; - } - } else { - if (!alreadyWarned_) { - LOG.warn("File does not exist: {}", file_.getPath()); - alreadyWarned_ = true; - } - } - } - - /** - * Starts the thread to check for file changes. Continues checking for file changes - * every 'checkIntervalMs_' milliseconds until stop() is called. - */ - public synchronized void start() { - Preconditions.checkState(!running_.get()); - running_.set(true); - - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("FileWatchThread(" + file_.getPath() + ")-%d") - .build()); - fileCheckFuture_ = executor.scheduleAtFixedRate(new Runnable() { - public void run() { - try { - checkFile(); - } catch (SecurityException e) { - LOG.warn("Not allowed to check read file existence: " + file_.getPath(), e); - } - } - }, 0L, checkIntervalMs_, TimeUnit.MILLISECONDS); - } - - /** - * Stops the file watching thread. - */ - public synchronized void stop() { - Preconditions.checkState(running_.get()); - running_.set(false); - fileCheckFuture_.cancel(false); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/FsPermissionChecker.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/FsPermissionChecker.java b/fe/src/main/java/com/cloudera/impala/util/FsPermissionChecker.java deleted file mode 100644 index 7523cc8..0000000 --- a/fe/src/main/java/com/cloudera/impala/util/FsPermissionChecker.java +++ /dev/null @@ -1,301 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.util; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; -import java.util.Map; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.permission.AclEntry; -import org.apache.hadoop.fs.permission.AclEntryType; -import org.apache.hadoop.fs.permission.AclStatus; -import org.apache.hadoop.fs.permission.AclEntryScope; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.hdfs.protocol.AclException; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; -import com.google.common.collect.Lists; - -/** - * Singleton class that can check whether the current user has permission to access paths - * in a FileSystem. - */ -public class FsPermissionChecker { - private final static Logger LOG = LoggerFactory.getLogger(FsPermissionChecker.class); - private final static FsPermissionChecker instance_; - private final static Configuration CONF; - protected final String user_; - private final Set<String> groups_ = new HashSet<String>(); - private final String supergroup_; - - static { - CONF = new Configuration(); - try { - instance_ = new FsPermissionChecker(); - } catch (IOException e) { - throw new RuntimeException( - "Error initializing FsPermissionChecker: " + e.getMessage(), e); - } - } - - private FsPermissionChecker() throws IOException { - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - groups_.addAll(Arrays.asList(ugi.getGroupNames())); - supergroup_ = CONF.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, - DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); - user_ = ugi.getShortUserName(); - } - - private boolean isSuperUser() { return groups_.contains(supergroup_); } - - private static List<AclEntryType> ACL_TYPE_PRIORITY = - ImmutableList.of(AclEntryType.USER, AclEntryType.GROUP, AclEntryType.OTHER); - - /** - * Allows checking different access permissions of a file without repeatedly accessing - * the underlying filesystem by caching the results of a status call at construction. - */ - public class Permissions { - private final FileStatus fileStatus_; - private final FsPermission permissions_; - private final AclStatus aclStatus_; - private Map<AclEntryType, List<AclEntry>> entriesByTypes_ = Maps.newHashMap(); - private AclEntry mask_; - - /** - * If aclStatus is null, ACL permissions are not checked. - */ - protected Permissions(FileStatus fileStatus, AclStatus aclStatus) { - Preconditions.checkNotNull(fileStatus); - fileStatus_ = fileStatus; - permissions_ = fileStatus.getPermission(); - aclStatus_ = aclStatus; - if (aclStatus_ == null) return; - - // Group the ACLs by type, so that we can apply them in correct priority order. Not - // clear from documentation whether aclStatus_.getEntries() guarantees this - // ordering, so this is defensive. - for (AclEntryType t: ACL_TYPE_PRIORITY) { - entriesByTypes_.put(t, Lists.<AclEntry>newArrayList()); - } - - List<AclEntry> fullAclList = - getAclFromPermAndEntries(permissions_, aclStatus_.getEntries()); - for (AclEntry e: fullAclList) { - if (e.getType() == AclEntryType.MASK && e.getScope() != AclEntryScope.DEFAULT) { - mask_ = e; - } else if (isApplicableAcl(e)) { - entriesByTypes_.get(e.getType()).add(e); - } - } - } - - /** - * Returns true if the mask should apply. The mask ACL applies only to unnamed user - * ACLs (e.g. user::r-x), and all group ACLs. - */ - private boolean shouldApplyMask(AclEntry acl) { - if (mask_ == null) return false; - - switch (acl.getType()) { - case USER: - return acl.getName() != null; - case GROUP: - return true; - } - return false; - } - - /** - * Returns true if this ACL applies to the current user and / or group - */ - private boolean isApplicableAcl(AclEntry e) { - // Default ACLs are not used for permission checking, but instead control the - // permissions received by child directories - if (e.getScope() == AclEntryScope.DEFAULT) return false; - - switch (e.getType()) { - case USER: - String aclUser = e.getName() == null ? aclStatus_.getOwner() : e.getName(); - return FsPermissionChecker.this.user_.equals(aclUser); - case GROUP: - String aclGroup = e.getName() == null ? aclStatus_.getGroup() : e.getName(); - return FsPermissionChecker.this.groups_.contains(aclGroup); - case OTHER: - return true; - case MASK: - return false; - default: - LOG.warn("Unknown Acl type: " + e.getType()); - return false; - } - } - - /** - * Returns true if ACLs allow 'action', false if they explicitly disallow 'action', - * and 'null' if no ACLs are available. - * See http://users.suse.com/~agruen/acl/linux-acls/online for more details about - * acl access check algorithm. - */ - private Boolean checkAcls(FsAction action) { - // ACLs may not be enabled, so we need this ternary logic. If no ACLs are available, - // returning null causes us to fall back to standard ugo permissions. - if (aclStatus_ == null) return null; - - // Remember if there is an applicable ACL entry, including owner user, named user, - // owning group, named group. - boolean foundMatch = false; - for (AclEntryType t: ACL_TYPE_PRIORITY) { - for (AclEntry e: entriesByTypes_.get(t)) { - if (t == AclEntryType.OTHER) { - // Processed all ACL entries except the OTHER entry. - // If found applicable ACL entries but none of them contain requested - // permission, deny access. Otherwise check OTHER entry. - return foundMatch ? false : e.getPermission().implies(action); - } - // If there is an applicable mask, 'action' is allowed iff both the mask and - // the underlying ACL permit it. - if (e.getPermission().implies(action)) { - if (shouldApplyMask(e)) { - if (mask_.getPermission().implies(action)) return true; - } else { - return true; - } - } - // User ACL entry has priority, no need to continue check. - if (t == AclEntryType.USER) return false; - - foundMatch = true; - } - } - return false; - } - - /** - * Returns true if the current user can perform the given action given these - * permissions. - */ - public boolean checkPermissions(FsAction action) { - if (FsPermissionChecker.this.isSuperUser()) return true; - Boolean aclPerms = checkAcls(action); - if (aclPerms != null) return aclPerms; - - // Check user, group and then 'other' permissions in turn. - if (FsPermissionChecker.this.user_.equals(fileStatus_.getOwner())) { - // If the user matches, we must return their access rights whether or not the user - // is allowed to access without checking the group. This is counter-intuitive if - // the user cannot access the file, but the group permissions would allow it, but - // is consistent with UNIX behaviour. - return permissions_.getUserAction().implies(action); - } - - if (FsPermissionChecker.this.groups_.contains(fileStatus_.getGroup())) { - return permissions_.getGroupAction().implies(action); - } - return permissions_.getOtherAction().implies(action); - } - - public boolean canRead() { return checkPermissions(FsAction.READ); } - public boolean canWrite() { return checkPermissions(FsAction.WRITE); } - public boolean canReadAndWrite() { return canRead() && canWrite(); } - - // This was originally lifted from Hadoop. Won't need it if HDFS-7177 is resolved. - // getAclStatus() returns just extended ACL entries, the default file permissions - // like "user::,group::,other::" are not included. We need to combine them together - // to get full logic ACL list. - private List<AclEntry> getAclFromPermAndEntries(FsPermission perm, - List<AclEntry> entries) { - // File permission always have 3 items. - List<AclEntry> aclEntries = Lists.newArrayListWithCapacity(entries.size() + 3); - - // Owner entry implied by owner permission bits. - aclEntries.add(new AclEntry.Builder() - .setScope(AclEntryScope.ACCESS) - .setType(AclEntryType.USER) - .setPermission(perm.getUserAction()) - .build()); - - // All extended access ACL entries add by "-setfacl" other than default file - // permission. - boolean hasAccessAcl = false; - for (AclEntry entry: entries) { - // AclEntry list should be ordered, all ACCESS one are in first half, DEFAULT one - // are in second half, so no need to continue here. - if (entry.getScope() == AclEntryScope.DEFAULT) break; - hasAccessAcl = true; - aclEntries.add(entry); - } - - // Mask entry implied by group permission bits, or group entry if there is - // no access ACL (only default ACL). - aclEntries.add(new AclEntry.Builder() - .setScope(AclEntryScope.ACCESS) - .setType(hasAccessAcl ? AclEntryType.MASK : AclEntryType.GROUP) - .setPermission(perm.getGroupAction()) - .build()); - - // Other entry implied by other bits. - aclEntries.add(new AclEntry.Builder() - .setScope(AclEntryScope.ACCESS) - .setType(AclEntryType.OTHER) - .setPermission(perm.getOtherAction()) - .build()); - - return aclEntries; - } - } - - /** - * Returns a Permissions object that can answer all access permission queries for the - * given path. - */ - public Permissions getPermissions(FileSystem fs, Path path) throws IOException { - Preconditions.checkNotNull(fs); - Preconditions.checkNotNull(path); - AclStatus aclStatus = null; - try { - aclStatus = fs.getAclStatus(path); - } catch (AclException ex) { - LOG.trace("No ACLs retrieved, skipping ACLs check (HDFS will enforce ACLs)", ex); - } catch (UnsupportedOperationException ex) { - LOG.trace("No ACLs retrieved, unsupported", ex); - } - return new Permissions(fs.getFileStatus(path), aclStatus); - } - - /** - * Returns the FsPermissionChecker singleton. - */ - public static FsPermissionChecker getInstance() { return instance_; } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/GlogAppender.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/GlogAppender.java b/fe/src/main/java/com/cloudera/impala/util/GlogAppender.java deleted file mode 100644 index a5e1eb1..0000000 --- a/fe/src/main/java/com/cloudera/impala/util/GlogAppender.java +++ /dev/null @@ -1,129 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.util; - -import java.util.Properties; - -import org.apache.log4j.AppenderSkeleton; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.PropertyConfigurator; -import org.apache.log4j.spi.LoggingEvent; - -import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.thrift.TLogLevel; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; - -/** - * log4j appender which calls into C++ code to log messages at their correct severities - * via glog. - */ -public class GlogAppender extends AppenderSkeleton { - // GLOG takes care of formatting, so we don't require a layout - public boolean requiresLayout() { return false; } - - // Required implementation by superclass. - public void ActivateOptions() { } - - // Required implementation by superclass - public void close() { } - - private TLogLevel levelToSeverity(Level level) { - Preconditions.checkState(!level.equals(Level.OFF)); - // TODO: Level does not work well in a HashMap or switch statement due to some - // strangeness with equality testing. - if (level.equals(Level.TRACE)) return TLogLevel.VLOG_3; - if (level.equals(Level.ALL)) return TLogLevel.VLOG_3; - if (level.equals(Level.DEBUG)) return TLogLevel.VLOG; - if (level.equals(Level.ERROR)) return TLogLevel.ERROR; - if (level.equals(Level.FATAL)) return TLogLevel.FATAL; - if (level.equals(Level.INFO)) return TLogLevel.INFO; - if (level.equals(Level.WARN)) return TLogLevel.WARN; - - throw new IllegalStateException("Unknown log level: " + level.toString()); - } - - @Override - public void append(LoggingEvent event) { - Level level = event.getLevel(); - if (level.equals(Level.OFF)) return; - - String msg = event.getRenderedMessage(); - if (event.getThrowableInformation() != null) { - msg = msg + "\nJava exception follows:\n" + - Joiner.on("\n").join(event.getThrowableStrRep()); - } - int lineNumber = Integer.parseInt(event.getLocationInformation().getLineNumber()); - String fileName = event.getLocationInformation().getFileName(); - NativeLogger.LogToGlog( - levelToSeverity(level).getValue(), msg, fileName, lineNumber); - } - - /** - * Returns a log4j level string corresponding to the Glog log level - */ - private static String log4jLevelForTLogLevel(TLogLevel logLevel) - throws InternalException { - switch (logLevel) { - case INFO: return "INFO"; - case WARN: return "WARN"; - case ERROR: return "ERROR"; - case FATAL: return "FATAL"; - case VLOG: - case VLOG_2: return "DEBUG"; - case VLOG_3: return "TRACE"; - default: throw new InternalException("Unknown log level:" + logLevel); - } - } - - /** - * Manually override Log4j root logger configuration. Any values in log4j.properties - * not overridden (that is, anything but the root logger and its default level) will - * continue to have effect. - * - impalaLogLevel - the maximum log level for com.cloudera.impala.* classes - * - otherLogLevel - the maximum log level for all other classes - */ - public static void Install(TLogLevel impalaLogLevel, TLogLevel otherLogLevel) - throws InternalException { - Properties properties = new Properties(); - properties.setProperty("log4j.appender.glog", GlogAppender.class.getName()); - - // These settings are relatively subtle. log4j provides many ways to filter log - // messages, and configuring them in the right order is a bit of black magic. - // - // The 'Threshold' property supercedes everything, so must be set to its most - // permissive and applies to any message sent to the glog appender. - // - // The 'rootLogger' property controls the default maximum logging level (where more - // verbose->larger logging level) for the entire space of classes. This will apply to - // all non-Impala classes, so is set to otherLogLevel. - // - // Finally we can configure per-package logging which overrides the rootLogger - // setting. In order to control Impala's logging independently of the rest of the - // world, we set the log level for com.cloudera.impala. - properties.setProperty("log4j.rootLogger", - log4jLevelForTLogLevel(otherLogLevel) + ",glog"); - properties.setProperty("log4j.appender.glog.Threshold", "TRACE"); - properties.setProperty("log4j.logger.com.cloudera.impala", - log4jLevelForTLogLevel(impalaLogLevel)); - PropertyConfigurator.configure(properties); - Logger.getLogger(GlogAppender.class).info(String.format("Logging initialized. " + - "Impala: %s, All other: %s", impalaLogLevel, otherLogLevel)); - } -}; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/HdfsCachingUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/HdfsCachingUtil.java b/fe/src/main/java/com/cloudera/impala/util/HdfsCachingUtil.java deleted file mode 100644 index a3a1fa0..0000000 --- a/fe/src/main/java/com/cloudera/impala/util/HdfsCachingUtil.java +++ /dev/null @@ -1,515 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.util; - -import java.io.IOException; -import java.util.Map; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; -import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.log4j.Logger; - -import com.cloudera.impala.analysis.TableName; -import com.cloudera.impala.catalog.HdfsPartition; -import com.cloudera.impala.common.FileSystemUtil; -import com.cloudera.impala.common.ImpalaException; -import com.cloudera.impala.common.ImpalaRuntimeException; -import com.cloudera.impala.thrift.JniCatalogConstants; -import com.cloudera.impala.thrift.THdfsCachingOp; -import com.google.common.base.Preconditions; - -/** - * Utility class for submitting and dropping HDFS cache requests. - */ -public class HdfsCachingUtil { - private static final Logger LOG = Logger.getLogger(HdfsCachingUtil.class); - - // The key name used to save cache directive IDs in table/partition properties. - public final static String CACHE_DIR_ID_PROP_NAME = "cache_directive_id"; - - // The key name used to store the replication factor for cached files - public final static String CACHE_DIR_REPLICATION_PROP_NAME = "cache_replication"; - - // The number of caching refresh intervals that can go by when waiting for data to - // become cached before assuming no more progress is being made. - private final static int MAX_UNCHANGED_CACHING_REFRESH_INTERVALS = 5; - - private static DistributedFileSystem dfs = null; - - /** - * Returns the dfs singleton object. - */ - private static DistributedFileSystem getDfs() throws ImpalaRuntimeException { - if (dfs == null) { - try { - dfs = FileSystemUtil.getDistributedFileSystem(); - } catch (IOException e) { - throw new ImpalaRuntimeException("HdfsCachingUtil failed to initialize the " + - "DistributedFileSystem: ", e); - } - } - return dfs; - } - - /** - * Caches the location of the given Hive Metastore Table and updates the - * table's properties with the submitted cache directive ID. The caller is - * responsible for not caching the same table twice, as HDFS will create a second - * cache directive even if it is similar to an already existing one. - * - * Returns the ID of the submitted cache directive and throws if there is an error - * submitting. - */ - public static long submitCacheTblDirective( - org.apache.hadoop.hive.metastore.api.Table table, - String poolName, short replication) throws ImpalaRuntimeException { - long id = HdfsCachingUtil.submitDirective(new Path(table.getSd().getLocation()), - poolName, replication); - table.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id)); - table.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication)); - return id; - } - - /** - * Caches the location of the given partition and updates the - * partitions's properties with the submitted cache directive ID. The caller is - * responsible for not caching the same partition twice, as HDFS will create a second - * cache directive even if it is similar to an already existing one. - * - * Returns the ID of the submitted cache directive and throws if there is an error - * submitting the directive. - */ - public static long submitCachePartitionDirective(HdfsPartition part, - String poolName, short replication) throws ImpalaRuntimeException { - long id = HdfsCachingUtil.submitDirective(new Path(part.getLocation()), - poolName, replication); - part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id)); - part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication)); - return id; - } - - /** - * Convenience method for working directly on a metastore partition. See - * submitCachePartitionDirective(HdfsPartition, String, short) for more details. - */ - public static long submitCachePartitionDirective( - org.apache.hadoop.hive.metastore.api.Partition part, - String poolName, short replication) throws ImpalaRuntimeException { - long id = HdfsCachingUtil.submitDirective(new Path(part.getSd().getLocation()), - poolName, replication); - part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id)); - part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication)); - return id; - } - - /** - * Removes the cache directive associated with the table from HDFS, uncaching all - * data. Also updates the table's metadata. No-op if the table is not cached. - */ - public static void uncacheTbl(org.apache.hadoop.hive.metastore.api.Table table) - throws ImpalaRuntimeException { - Preconditions.checkNotNull(table); - LOG.debug("Uncaching table: " + table.getDbName() + "." + table.getTableName()); - Long id = getCacheDirectiveId(table.getParameters()); - if (id == null) return; - HdfsCachingUtil.removeDirective(id); - table.getParameters().remove(CACHE_DIR_ID_PROP_NAME); - table.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME); - } - - /** - * Removes the cache directive associated with the partition from HDFS, uncaching all - * data. Also updates the partition's metadata to remove the cache directive ID. - * No-op if the table is not cached. - */ - public static void uncachePartition(HdfsPartition part) throws ImpalaException { - Preconditions.checkNotNull(part); - Long id = getCacheDirectiveId(part.getParameters()); - if (id == null) return; - HdfsCachingUtil.removeDirective(id); - part.getParameters().remove(CACHE_DIR_ID_PROP_NAME); - part.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME); - } - - /** - * Convenience method for working directly on a metastore partition. See - * uncachePartition(HdfsPartition) for more details. - */ - public static void uncachePartition( - org.apache.hadoop.hive.metastore.api.Partition part) throws ImpalaException { - Preconditions.checkNotNull(part); - Long id = getCacheDirectiveId(part.getParameters()); - if (id == null) return; - HdfsCachingUtil.removeDirective(id); - part.getParameters().remove(CACHE_DIR_ID_PROP_NAME); - part.getParameters().remove(CACHE_DIR_REPLICATION_PROP_NAME); - } - - /** - * Returns the cache directive ID from the given table/partition parameter - * map. Returns null if the CACHE_DIR_ID_PROP_NAME key was not set or if - * there was an error parsing the associated ID. - */ - public static Long getCacheDirectiveId(Map<String, String> params) { - if (params == null) return null; - String idStr = params.get(CACHE_DIR_ID_PROP_NAME); - if (idStr == null) return null; - try { - return Long.parseLong(idStr); - } catch (NumberFormatException e) { - return null; - } - } - - /** - * Given a cache directive ID, returns the pool the directive is cached in. - * Returns null if no outstanding cache directive match this ID. - */ - public static String getCachePool(long directiveId) - throws ImpalaRuntimeException { - CacheDirectiveEntry entry = getDirective(directiveId); - return entry == null ? null : entry.getInfo().getPool(); - } - - /** - * Given a cache directive ID, returns the replication factor for the directive. - * Returns null if no outstanding cache directives match this ID. - */ - public static Short getCacheReplication(long directiveId) - throws ImpalaRuntimeException { - CacheDirectiveEntry entry = getDirective(directiveId); - return entry != null ? entry.getInfo().getReplication() : null; - } - - /** - * Returns the cache replication value from the parameters map. We assume that only - * cached table parameters are used and the property is always present. - */ - public static Short getCachedCacheReplication(Map<String, String> params) { - Preconditions.checkNotNull(params); - String replication = params.get(CACHE_DIR_REPLICATION_PROP_NAME); - if (replication == null) { - return JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR; - } - try { - return Short.parseShort(replication); - } catch (NumberFormatException e) { - return JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR; - } - } - - /** - * Waits on a cache directive to either complete or stop making progress. Progress is - * checked by polling the HDFS caching stats every - * DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS. We verify the request's - * "currentBytesCached" is increasing compared to "bytesNeeded". - * If "currentBytesCached" == "bytesNeeded" or if no progress is made for a - * MAX_UNCHANGED_CACHING_REFRESH_INTERVALS, this function returns. - */ - public static void waitForDirective(long directiveId) - throws ImpalaRuntimeException { - long bytesNeeded = 0L; - long currentBytesCached = 0L; - CacheDirectiveEntry cacheDir = getDirective(directiveId); - if (cacheDir == null) return; - - bytesNeeded = cacheDir.getStats().getBytesNeeded(); - currentBytesCached = cacheDir.getStats().getBytesCached(); - LOG.debug(String.format("Waiting on cache directive id: %d. Bytes " + - "cached (%d) / needed (%d)", directiveId, currentBytesCached, bytesNeeded)); - // All the bytes are cached, just return. - if (bytesNeeded == currentBytesCached) return; - - // The refresh interval is how often HDFS will update cache directive stats. We use - // this value to determine how frequently we should poll for changes. - long hdfsRefreshIntervalMs = getDfs().getConf().getLong( - DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, - DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT); - Preconditions.checkState(hdfsRefreshIntervalMs > 0); - - // Loop until either MAX_UNCHANGED_CACHING_REFRESH_INTERVALS have passed with no - // changes or all required data is cached. - int unchangedCounter = 0; - while (unchangedCounter < MAX_UNCHANGED_CACHING_REFRESH_INTERVALS) { - long previousBytesCached = currentBytesCached; - cacheDir = getDirective(directiveId); - if (cacheDir == null) return; - currentBytesCached = cacheDir.getStats().getBytesCached(); - bytesNeeded = cacheDir.getStats().getBytesNeeded(); - if (currentBytesCached == bytesNeeded) { - LOG.debug(String.format("Cache directive id: %d has completed." + - "Bytes cached (%d) / needed (%d)", directiveId, currentBytesCached, - bytesNeeded)); - return; - } - - if (currentBytesCached == previousBytesCached) { - ++unchangedCounter; - } else { - unchangedCounter = 0; - } - try { - // Sleep for the refresh interval + a little bit more to ensure a full interval - // has completed. A value of 25% the refresh interval was arbitrarily chosen. - Thread.sleep((long) (hdfsRefreshIntervalMs * 1.25)); - } catch (InterruptedException e) { /* ignore */ } - } - LOG.warn(String.format("No changes in cached bytes in: %d(ms). All data may not " + - "be cached. Final stats for cache directive id: %d. Bytes cached (%d)/needed " + - "(%d)", hdfsRefreshIntervalMs * MAX_UNCHANGED_CACHING_REFRESH_INTERVALS, - directiveId, currentBytesCached, bytesNeeded)); - } - - /** - * Submits a new caching directive for the specified cache pool name, path and - * replication. Returns the directive ID if the submission was successful or an - * ImpalaRuntimeException if the submission fails. - */ - private static long submitDirective(Path path, String poolName, short replication) - throws ImpalaRuntimeException { - Preconditions.checkNotNull(path); - Preconditions.checkState(poolName != null && !poolName.isEmpty()); - CacheDirectiveInfo info = new CacheDirectiveInfo.Builder() - .setExpiration(Expiration.NEVER) - .setPool(poolName) - .setReplication(replication) - .setPath(path).build(); - LOG.debug("Submitting cache directive: " + info.toString()); - try { - return getDfs().addCacheDirective(info); - } catch (IOException e) { - throw new ImpalaRuntimeException(e.getMessage(), e); - } - } - - /** - * Update cache directive for a table and updates the metastore parameters. - * Returns the cache directive ID - */ - public static long modifyCacheDirective(Long id, - org.apache.hadoop.hive.metastore.api.Table table, - String poolName, short replication) throws ImpalaRuntimeException { - Preconditions.checkNotNull(id); - HdfsCachingUtil.modifyCacheDirective(id, new Path(table.getSd().getLocation()), - poolName, replication); - table.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id)); - table.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication)); - return id; - } - - /** - * Update cache directive for a partition and update the metastore parameters. - * Returns the cache directive ID - */ - public static long modifyCacheDirective(Long id, HdfsPartition part, String poolName, - short replication) throws ImpalaRuntimeException { - Preconditions.checkNotNull(id); - HdfsCachingUtil.modifyCacheDirective(id, new Path(part.getLocation()), - poolName, replication); - part.putToParameters(CACHE_DIR_ID_PROP_NAME, Long.toString(id)); - part.putToParameters(CACHE_DIR_REPLICATION_PROP_NAME, Long.toString(replication)); - return id; - } - - /** - * Update an existing cache directive to avoid having the same entry multiple - * times - */ - private static void modifyCacheDirective(Long id, Path path, String poolName, - short replication) throws ImpalaRuntimeException { - Preconditions.checkNotNull(path); - Preconditions.checkNotNull(id); - Preconditions.checkState(poolName != null && !poolName.isEmpty()); - CacheDirectiveInfo info = new CacheDirectiveInfo.Builder() - .setId(id) - .setExpiration(Expiration.NEVER) - .setPool(poolName) - .setReplication(replication) - .setPath(path).build(); - LOG.debug("Modifying cache directive: " + info.toString()); - try { - getDfs().modifyCacheDirective(info); - } catch (IOException e) { - throw new ImpalaRuntimeException(e.getMessage(), e); - } - } - - /** - * Removes the given cache directive if it exists, uncaching the data. If the - * cache request does not exist in HDFS no error is returned. - * Throws an ImpalaRuntimeException if there was any problem removing the - * directive. - */ - private static void removeDirective(long directiveId) throws ImpalaRuntimeException { - LOG.debug("Removing cache directive id: " + directiveId); - try { - getDfs().removeCacheDirective(directiveId); - } catch (IOException e) { - // There is no special exception type for the case where a directive ID does not - // exist so we must inspect the error message. - if (e.getMessage().contains("No directive with ID")) return; - throw new ImpalaRuntimeException(e.getMessage(), e); - } - } - - /** - * Gets the cache directive matching the given ID. Returns null if no matching - * directives were found. - */ - private static CacheDirectiveEntry getDirective(long directiveId) - throws ImpalaRuntimeException { - LOG.trace("Getting cache directive id: " + directiveId); - CacheDirectiveInfo filter = new CacheDirectiveInfo.Builder() - .setId(directiveId) - .build(); - try { - RemoteIterator<CacheDirectiveEntry> itr = getDfs().listCacheDirectives(filter); - if (itr.hasNext()) return itr.next(); - } catch (IOException e) { - // Handle connection issues with e.g. HDFS and possible not found errors - throw new ImpalaRuntimeException(e.getMessage(), e); - } - throw new ImpalaRuntimeException( - "HDFS cache directive filter returned empty result. This must not happen"); - } - - /** - * Check if the poolName matches the pool of the cache directive - * identified by directiveId - */ - public static boolean isSamePool(String poolName, Long directiveId) - throws ImpalaRuntimeException { - return poolName.equals(getCachePool(directiveId)); - } - - /** - * Helper method for frequent lookup of replication factor in the thrift caching - * structure. - */ - public static short getReplicationOrDefault(THdfsCachingOp op) { - return op.isSetReplication() ? op.getReplication() : - JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR; - } - - /** - * Returns a boolean indicating if the given thrift caching operation would perform an - * update on an already existing cache directive. - */ - public static boolean isUpdateOp(THdfsCachingOp op, Map<String, String> params) - throws ImpalaRuntimeException { - - Long directiveId = Long.parseLong(params.get(CACHE_DIR_ID_PROP_NAME)); - CacheDirectiveEntry entry = getDirective(directiveId); - Preconditions.checkNotNull(entry); - - // Verify cache pool - if (!op.getCache_pool_name().equals(entry.getInfo().getPool())) { - return false; - } - - // Check cache replication factor - if ((op.isSetReplication() && op.getReplication() != - entry.getInfo().getReplication()) || ( !op.isSetReplication() && - entry.getInfo().getReplication() != - JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR)) { - return true; - } - return false; - } - - /** - * Validates the properties of the chosen cache pool. Throws on error. - */ - public static void validateCachePool(THdfsCachingOp op, Long directiveId, - TableName table, HdfsPartition partition) - throws ImpalaRuntimeException { - - CacheDirectiveEntry entry = getDirective(directiveId); - Preconditions.checkNotNull(entry); - - if (!op.getCache_pool_name().equals(entry.getInfo().getPool())) { - throw new ImpalaRuntimeException(String.format("Cannot cache partition in " + - "pool '%s' because it is already cached in '%s'. To change the cache " + - "pool for this partition, first uncache using: ALTER TABLE %s.%s " + - "%sSET UNCACHED", op.getCache_pool_name(), - entry.getInfo().getPool(), table.getDb(), table, - // Insert partition string if partition non null - partition != null ? String.format(" PARTITION(%s) ", - partition.getPartitionName().replaceAll("/", ", ")) : "")); - } - } - - /** - * Validates the properties of the chosen cache pool. Throws on error. - */ - public static void validateCachePool(THdfsCachingOp op, Long directiveId, - TableName table) throws ImpalaRuntimeException { - validateCachePool(op, directiveId, table, null); - } - - /** - * Validates and returns true if a parameter map contains a cache directive ID and - * validates it against the NameNode to make sure it exists. If the cache - * directive ID does not exist, we remove the value from the parameter map, - * issue a log message and return false. As the value is not written back to the - * Hive MS from this method, the result will be only valid until the next metadata - * fetch. Lastly, we update the cache replication factor in the parameters with the - * value read from HDFS. - */ - public static boolean validateCacheParams(Map<String, String> params) { - Long directiveId = getCacheDirectiveId(params); - if (directiveId == null) return false; - - CacheDirectiveEntry entry = null; - try { - entry = getDirective(directiveId); - } catch (ImpalaRuntimeException e) { - if (e.getCause() != null && e.getCause() instanceof RemoteException) { - // This exception signals that the cache directive no longer exists. - LOG.error("Cache directive does not exist", e); - params.remove(CACHE_DIR_ID_PROP_NAME); - params.remove(CACHE_DIR_REPLICATION_PROP_NAME); - } else { - // This exception signals that there was a connection problem with HDFS. - LOG.error("IO Exception, possible connectivity issues with HDFS", e); - } - return false; - } - Preconditions.checkNotNull(entry); - - // On the upgrade path the property might not exist, if it exists - // and is different from the one from the meta store, issue a warning. - String replicationFactor = params.get(CACHE_DIR_REPLICATION_PROP_NAME); - if (replicationFactor != null && - Short.parseShort(replicationFactor) != entry.getInfo().getReplication()) { - LOG.info("Replication factor for entry in HDFS differs from value in Hive MS: " + - entry.getInfo().getPath().toString() + " " + - entry.getInfo().getReplication().toString() + " != " + - params.get(CACHE_DIR_REPLICATION_PROP_NAME)); - } - params.put(CACHE_DIR_REPLICATION_PROP_NAME, - String.valueOf(entry.getInfo().getReplication())); - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java b/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java deleted file mode 100644 index 4f627d8..0000000 --- a/fe/src/main/java/com/cloudera/impala/util/KuduUtil.java +++ /dev/null @@ -1,268 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.util; - -import java.io.StringReader; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import javax.json.Json; -import javax.json.JsonArray; -import javax.json.JsonReader; - -import com.cloudera.impala.catalog.ScalarType; -import com.cloudera.impala.common.ImpalaRuntimeException; -import com.cloudera.impala.thrift.TDistributeByRangeParam; -import com.cloudera.impala.thrift.TRangeLiteral; -import com.cloudera.impala.thrift.TRangeLiteralList; -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.kudu.ColumnSchema; -import org.apache.kudu.Schema; -import org.apache.kudu.Type; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.PartialRow; - -import static com.cloudera.impala.catalog.Type.parseColumnType; -import static java.lang.String.format; - -public class KuduUtil { - - private static final String SPLIT_KEYS_ERROR_MESSAGE = "Error parsing splits keys."; - - /** - * Compare the schema of a HMS table and a Kudu table. Returns true if both tables have - * a matching schema. - */ - public static boolean compareSchema(Table msTable, KuduTable kuduTable) - throws ImpalaRuntimeException { - List<FieldSchema> msFields = msTable.getSd().getCols(); - List<ColumnSchema> kuduFields = kuduTable.getSchema().getColumns(); - if (msFields.size() != kuduFields.size()) return false; - - HashMap<String, ColumnSchema> kuduFieldMap = Maps.newHashMap(); - for (ColumnSchema kuduField : kuduFields) { - kuduFieldMap.put(kuduField.getName().toUpperCase(), kuduField); - } - - for (FieldSchema msField : msFields) { - ColumnSchema kuduField = kuduFieldMap.get(msField.getName().toUpperCase()); - if (kuduField == null - || fromImpalaType(parseColumnType(msField.getType())) != kuduField.getType()) { - return false; - } - } - - return true; - } - - /** - * Parses split keys from statements. - * - * Split keys are expected to be in json, as an array of arrays, in the form: - * '[[value1_col1, value1_col2, ...], [value2_col1, value2_col2, ...], ...]' - * - * Each inner array corresponds to a split key and should have one matching entry for - * each key column specified in 'schema'. - */ - public static List<PartialRow> parseSplits(Schema schema, String kuduSplits) - throws ImpalaRuntimeException { - - // If there are no splits return early. - if (kuduSplits == null || kuduSplits.isEmpty()) return ImmutableList.of(); - - ImmutableList.Builder<PartialRow> splitRows = ImmutableList.builder(); - - // ...Otherwise parse the splits. We're expecting splits in the format of a list of - // lists of keys. We only support specifying splits for int and string keys - // (currently those are the only type of keys allowed in Kudu too). - try { - JsonReader jr = Json.createReader(new StringReader(kuduSplits)); - JsonArray keysList = jr.readArray(); - for (int i = 0; i < keysList.size(); i++) { - PartialRow splitRow = new PartialRow(schema); - JsonArray compoundKey = keysList.getJsonArray(i); - if (compoundKey.size() != schema.getPrimaryKeyColumnCount()) { - throw new ImpalaRuntimeException(SPLIT_KEYS_ERROR_MESSAGE + - " Wrong number of keys."); - } - for (int j = 0; j < compoundKey.size(); j++) { - setKey(schema.getColumnByIndex(j).getType(), compoundKey, j, splitRow); - } - splitRows.add(splitRow); - } - } catch (ImpalaRuntimeException e) { - throw e; - } catch (Exception e) { - throw new ImpalaRuntimeException(SPLIT_KEYS_ERROR_MESSAGE + " Problem parsing json" - + ": " + e.getMessage(), e); - } - - return splitRows.build(); - } - - /** - * Given the TDistributeByRangeParam from the CREATE statement, creates the - * appropriate split rows. - */ - public static List<PartialRow> parseSplits(Schema schema, - TDistributeByRangeParam param) throws ImpalaRuntimeException { - ImmutableList.Builder<PartialRow> splitRows = ImmutableList.builder(); - for (TRangeLiteralList literals : param.getSplit_rows()) { - PartialRow splitRow = new PartialRow(schema); - List<TRangeLiteral> literalValues = literals.getValues(); - for (int i = 0; i < literalValues.size(); ++i) { - String colName = param.getColumns().get(i); - ColumnSchema col = schema.getColumn(colName); - setKey(col.getType(), literalValues.get(i), schema.getColumnIndex(colName), - colName, splitRow); - } - splitRows.add(splitRow); - } - return splitRows.build(); - } - - /** - * Sets the value in 'key' at 'pos', given the json representation. - */ - private static void setKey(Type type, JsonArray array, int pos, PartialRow key) - throws ImpalaRuntimeException { - switch (type) { - case BOOL: key.addBoolean(pos, array.getBoolean(pos)); break; - case INT8: key.addByte(pos, (byte) array.getInt(pos)); break; - case INT16: key.addShort(pos, (short) array.getInt(pos)); break; - case INT32: key.addInt(pos, array.getInt(pos)); break; - case INT64: key.addLong(pos, array.getJsonNumber(pos).longValue()); break; - case STRING: key.addString(pos, array.getString(pos)); break; - default: - throw new ImpalaRuntimeException("Key columns not supported for type: " - + type.toString()); - } - } - - /** - * Sets the value in 'key' at 'pos', given the range literal. - */ - private static void setKey(Type type, TRangeLiteral literal, int pos, String colName, - PartialRow key) throws ImpalaRuntimeException { - switch (type) { - case BOOL: - checkCorrectType(literal.isSetBool_literal(), type, colName, literal); - key.addBoolean(pos, literal.isBool_literal()); - break; - case INT8: - checkCorrectType(literal.isSetInt_literal(), type, colName, literal); - key.addByte(pos, (byte) literal.getInt_literal()); - break; - case INT16: - checkCorrectType(literal.isSetInt_literal(), type, colName, literal); - key.addShort(pos, (short) literal.getInt_literal()); - break; - case INT32: - checkCorrectType(literal.isSetInt_literal(), type, colName, literal); - key.addInt(pos, (int) literal.getInt_literal()); - break; - case INT64: - checkCorrectType(literal.isSetInt_literal(), type, colName, literal); - key.addLong(pos, literal.getInt_literal()); - break; - case STRING: - checkCorrectType(literal.isSetString_literal(), type, colName, literal); - key.addString(pos, literal.getString_literal()); - break; - default: - throw new ImpalaRuntimeException("Key columns not supported for type: " - + type.toString()); - } - } - - /** - * If correctType is true, returns. Otherwise throws a formatted error message - * indicating problems with the type of the literal of the range literal. - */ - private static void checkCorrectType(boolean correctType, Type t, String colName, - TRangeLiteral literal) throws ImpalaRuntimeException { - if (correctType) return; - throw new ImpalaRuntimeException( - format("Expected %s literal for column '%s' got '%s'", t.getName(), colName, - toString(literal))); - } - - /** - * Parses a string of the form "a, b, c" and returns a set of values split by ',' and - * stripped of the whitespace. - */ - public static HashSet<String> parseKeyColumns(String cols) { - return Sets.newHashSet(Splitter.on(",").trimResults().split(cols.toLowerCase())); - } - - public static List<String> parseKeyColumnsAsList(String cols) { - return Lists.newArrayList(Splitter.on(",").trimResults().split(cols.toLowerCase())); - } - - /** - * Converts a given Impala catalog type to the Kudu type. Throws an exception if the - * type cannot be converted. - */ - public static Type fromImpalaType(com.cloudera.impala.catalog.Type t) - throws ImpalaRuntimeException { - if (!t.isScalarType()) { - throw new ImpalaRuntimeException(format( - "Non-scalar type %s is not supported in Kudu", t.toSql())); - } - ScalarType s = (ScalarType) t; - switch (s.getPrimitiveType()) { - case TINYINT: return Type.INT8; - case SMALLINT: return Type.INT16; - case INT: return Type.INT32; - case BIGINT: return Type.INT64; - case BOOLEAN: return Type.BOOL; - case CHAR: return Type.STRING; - case STRING: return Type.STRING; - case VARCHAR: return Type.STRING; - case DOUBLE: return Type.DOUBLE; - case FLOAT: return Type.FLOAT; - /* Fall through below */ - case INVALID_TYPE: - case NULL_TYPE: - case TIMESTAMP: - case BINARY: - case DATE: - case DATETIME: - case DECIMAL: - default: - throw new ImpalaRuntimeException(format( - "Type %s is not supported in Kudu", s.toSql())); - } - } - - /** - * Returns the string value of the RANGE literal. - */ - static String toString(TRangeLiteral l) throws ImpalaRuntimeException { - if (l.isSetBool_literal()) return String.valueOf(l.bool_literal); - if (l.isSetString_literal()) return String.valueOf(l.string_literal); - if (l.isSetInt_literal()) return String.valueOf(l.int_literal); - throw new ImpalaRuntimeException("Unsupported type for RANGE literal."); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/ListMap.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/ListMap.java b/fe/src/main/java/com/cloudera/impala/util/ListMap.java deleted file mode 100644 index 989a510..0000000 --- a/fe/src/main/java/com/cloudera/impala/util/ListMap.java +++ /dev/null @@ -1,77 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.util; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.common.collect.Lists; - -/** - * Implementation of a bi-directional map between an index of type - * Integer and an object of type T. The indices are allocated on - * demand when a reverse lookup occurs for an object not already in - * the map. - * - * The forward mapping is implemented as a List<> so that it can be - * directly used as a Thrift structure. - */ -public class ListMap<T> { - // Maps from Integer to T. - private ArrayList<T> list_ = Lists.newArrayList(); - // Maps from T to Integer. - private final Map<T, Integer> map_ = Maps.newHashMap(); - - public ArrayList<T> getList() { return list_; } - public int size() { return list_.size(); } - - /** - * Map from Integer index to T object. - */ - public T getEntry(int index) { return list_.get(index); } - - /** - * Map from T t to Integer index. If the mapping from t doesn't - * exist, then create a new mapping from t to a unique index. - */ - public int getIndex(T t) { - Integer index = map_.get(t); - if (index == null) { - // No match was found, add a new entry. - list_.add(t); - index = list_.size() - 1; - map_.put(t, index); - } - return index; - } - - /** - * Populate the bi-map from the given list. Does not perform a copy - * of the list. - */ - public void populate(ArrayList<T> list) { - Preconditions.checkState(list_.isEmpty() && map_.isEmpty()); - list_ = list; - for (int i = 0; i < list_.size(); ++i) { - map_.put(list_.get(i), i); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java b/fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java deleted file mode 100644 index ac85ff8..0000000 --- a/fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java +++ /dev/null @@ -1,66 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.cloudera.impala.util; - -import com.cloudera.impala.planner.NestedLoopJoinNode; -import com.cloudera.impala.planner.HashJoinNode; -import com.cloudera.impala.planner.PlanNode; -import com.cloudera.impala.planner.ScanNode; - -/** - * Returns the maximum number of rows processed by any node in a given plan tree - */ -public class MaxRowsProcessedVisitor implements Visitor<PlanNode> { - - private boolean abort_ = false; - private long result_ = -1l; - - @Override - public void visit(PlanNode caller) { - if (abort_) return; - - if (caller instanceof ScanNode) { - long tmp = caller.getInputCardinality(); - ScanNode scan = (ScanNode) caller; - boolean missingStats = scan.isTableMissingStats() || scan.hasCorruptTableStats(); - // In the absence of collection stats, treat scans on collections as if they - // have no limit. - if (scan.isAccessingCollectionType() || (missingStats && !scan.hasLimit())) { - abort_ = true; - return; - } - result_ = Math.max(result_, tmp); - } else if (caller instanceof HashJoinNode || caller instanceof NestedLoopJoinNode) { - // Revisit when multiple scan nodes can be executed in a single fragment, IMPALA-561 - abort_ = true; - return; - } else { - long in = caller.getInputCardinality(); - long out = caller.getCardinality(); - if ((in == -1) || (out == -1)) { - abort_ = true; - return; - } - result_ = Math.max(result_, Math.max(in, out)); - } - } - - public long get() { - return abort_ ? -1 : result_; - } -}
