Repository: hive Updated Branches: refs/heads/llap 7b9096a92 -> 0afaa8f6d
HIVE-13550: Get rid of wrapped LlapInputSplit/InputFormat classes Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0afaa8f6 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0afaa8f6 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0afaa8f6 Branch: refs/heads/llap Commit: 0afaa8f6dc60d51a01ba8085b1cb89624eafd3d0 Parents: 7b9096a Author: Jason Dere <[email protected]> Authored: Tue Apr 19 15:09:33 2016 -0700 Committer: Jason Dere <[email protected]> Committed: Tue Apr 19 15:09:33 2016 -0700 ---------------------------------------------------------------------- .../hive/llap/ext/TestLlapInputSplit.java | 26 +- .../hadoop/hive/llap/LlapInputFormat.java | 392 ---- llap-ext-client/pom.xml | 33 + .../hadoop/hive/llap/LlapBaseInputFormat.java | 351 ++- .../hadoop/hive/llap/LlapRowInputFormat.java | 4 +- .../apache/hive/llap/ext/LlapInputSplit.java | 73 - .../hadoop/hive/ql/exec/FunctionRegistry.java | 1 - .../udf/generic/GenericUDTFExecuteSplits.java | 124 - .../ql/udf/generic/GenericUDTFGetSplits.java | 10 +- .../queries/clientpositive/udtf_get_splits.q | 43 - .../clientpositive/llap/udtf_get_splits.q.out | 2130 ------------------ .../clientpositive/tez/udf_get_splits.q.out | 73 - 12 files changed, 389 insertions(+), 2871 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java index 04da17e..8264190 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java @@ -10,6 +10,7 @@ import java.util.HashMap; import org.apache.hadoop.io.Text; +import org.apache.hadoop.hive.llap.LlapInputSplit; import org.apache.hadoop.hive.llap.Schema; import org.apache.hadoop.hive.llap.FieldDesc; import org.apache.hadoop.hive.llap.TypeDesc; @@ -40,7 +41,7 @@ public class TestLlapInputSplit { colDescs.add(new FieldDesc("col2", new TypeDesc(TypeDesc.Type.INT))); Schema schema = new Schema(colDescs); - org.apache.hadoop.hive.llap.LlapInputSplit split1 = new org.apache.hadoop.hive.llap.LlapInputSplit( + LlapInputSplit split1 = new LlapInputSplit( splitNum, planBytes, fragmentBytes, @@ -52,35 +53,18 @@ public class TestLlapInputSplit { split1.write(dataOut); ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); DataInputStream dataIn = new DataInputStream(byteInStream); - org.apache.hadoop.hive.llap.LlapInputSplit split2 = new org.apache.hadoop.hive.llap.LlapInputSplit(); + LlapInputSplit split2 = new LlapInputSplit(); split2.readFields(dataIn); // Did we read all the data? assertEquals(0, byteInStream.available()); checkLlapSplits(split1, split2); - - // Try JDBC LlapInputSplits - org.apache.hive.llap.ext.LlapInputSplit<Text> jdbcSplit1 = - new org.apache.hive.llap.ext.LlapInputSplit<Text>(split1, "org.apache.hadoop.hive.llap.LlapInputFormat"); - byteOutStream.reset(); - jdbcSplit1.write(dataOut); - byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); - dataIn = new DataInputStream(byteInStream); - org.apache.hive.llap.ext.LlapInputSplit<Text> jdbcSplit2 = new org.apache.hive.llap.ext.LlapInputSplit<Text>(); - jdbcSplit2.readFields(dataIn); - - assertEquals(0, byteInStream.available()); - - checkLlapSplits( - (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit1.getSplit(), - (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit2.getSplit()); - assertEquals(jdbcSplit1.getInputFormat().getClass(), jdbcSplit2.getInputFormat().getClass()); } static void checkLlapSplits( - org.apache.hadoop.hive.llap.LlapInputSplit split1, - org.apache.hadoop.hive.llap.LlapInputSplit split2) throws Exception { + LlapInputSplit split1, + LlapInputSplit split2) throws Exception { assertEquals(split1.getSplitNum(), split2.getSplitNum()); assertArrayEquals(split1.getPlanBytes(), split2.getPlanBytes()); http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java deleted file mode 100644 index 0930d60..0000000 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java +++ /dev/null @@ -1,392 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; - -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; - -import org.apache.commons.collections4.ListUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; -import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient; -import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder; -import org.apache.hadoop.hive.llap.registry.ServiceInstance; -import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; -import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; -import org.apache.hadoop.hive.llap.tez.Converters; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.tez.common.security.JobTokenIdentifier; -import org.apache.tez.common.security.TokenCache; -import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; -import org.apache.tez.runtime.api.impl.TaskSpec; -import org.apache.tez.runtime.api.impl.TezEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.runtime.api.impl.EventType; -import org.apache.tez.runtime.api.impl.TezEvent; -import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; -import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; - - -public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { - - private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class); - - public LlapInputFormat() { - } - - /* - * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire - * off the work in the split to LLAP and finally return the connected socket back in an - * LlapRecordReader. The LlapRecordReader class reads the results from the socket. - */ - @Override - public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, - Reporter reporter) throws IOException { - - LlapInputSplit llapSplit = (LlapInputSplit) split; - - // Set conf to use LLAP user rather than current user for LLAP Zk registry. - HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser()); - SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes()); - - ServiceInstance serviceInstance = getServiceInstance(job, llapSplit); - String host = serviceInstance.getHost(); - int llapSubmitPort = serviceInstance.getRpcPort(); - - LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort - + " and outputformat port " + serviceInstance.getOutputFormatPort()); - - LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder = - new LlapRecordReaderTaskUmbilicalExternalResponder(); - LlapTaskUmbilicalExternalClient llapClient = - new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), - submitWorkInfo.getToken(), umbilicalResponder); - llapClient.init(job); - llapClient.start(); - - SubmitWorkRequestProto submitWorkRequestProto = - constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(), - llapClient.getAddress(), submitWorkInfo.getToken()); - - TezEvent tezEvent = new TezEvent(); - DataInputBuffer dib = new DataInputBuffer(); - dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length); - tezEvent.readFields(dib); - List<TezEvent> tezEventList = Lists.newArrayList(); - tezEventList.add(tezEvent); - - llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList); - - String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum(); - - HiveConf conf = new HiveConf(); - Socket socket = new Socket(host, - serviceInstance.getOutputFormatPort()); - - LOG.debug("Socket connected"); - - socket.getOutputStream().write(id.getBytes()); - socket.getOutputStream().write(0); - socket.getOutputStream().flush(); - - LOG.info("Registered id: " + id); - - LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); - umbilicalResponder.setRecordReader(recordReader); - return recordReader; - } - - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - throw new IOException("These are not the splits you are looking for."); - } - - private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException { - LlapRegistryService registryService = LlapRegistryService.getClient(job); - String host = llapSplit.getLocations()[0]; - - ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host); - if (serviceInstance == null) { - throw new IOException("No service instances found for " + host + " in registry"); - } - - return serviceInstance; - } - - private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException { - InetAddress address = InetAddress.getByName(host); - ServiceInstanceSet instanceSet = registryService.getInstances(); - ServiceInstance serviceInstance = null; - - // The name used in the service registry may not match the host name we're using. - // Try hostname/canonical hostname/host address - - String name = address.getHostName(); - LOG.info("Searching service instance by hostname " + name); - serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); - if (serviceInstance != null) { - return serviceInstance; - } - - name = address.getCanonicalHostName(); - LOG.info("Searching service instance by canonical hostname " + name); - serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); - if (serviceInstance != null) { - return serviceInstance; - } - - name = address.getHostAddress(); - LOG.info("Searching service instance by address " + name); - serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); - if (serviceInstance != null) { - return serviceInstance; - } - - return serviceInstance; - } - - private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) { - if (serviceInstances == null || serviceInstances.isEmpty()) { - return null; - } - - // Get the first live service instance - for (ServiceInstance serviceInstance : serviceInstances) { - if (serviceInstance.isAlive()) { - return serviceInstance; - } - } - - LOG.info("No live service instances were found"); - return null; - } - - private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo, - int taskNum, - InetSocketAddress address, - Token<JobTokenIdentifier> token) throws - IOException { - TaskSpec taskSpec = submitWorkInfo.getTaskSpec(); - ApplicationId appId = submitWorkInfo.getFakeAppId(); - - SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder(); - // This works, assuming the executor is running within YARN. - LOG.info("Setting user in submitWorkRequest to: " + - System.getenv(ApplicationConstants.Environment.USER.name())); - builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name())); - builder.setApplicationIdString(appId.toString()); - builder.setAppAttemptNumber(0); - builder.setTokenIdentifier(appId.toString()); - - ContainerId containerId = - ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum); - builder.setContainerIdString(containerId.toString()); - - builder.setAmHost(address.getHostName()); - builder.setAmPort(address.getPort()); - Credentials taskCredentials = new Credentials(); - // Credentials can change across DAGs. Ideally construct only once per DAG. - // TODO Figure out where credentials will come from. Normally Hive sets up - // URLs on the tez dag, for which Tez acquires credentials. - - // taskCredentials.addAll(getContext().getCredentials()); - - // Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() == - // taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); - // ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); - // if (credentialsBinary == null) { - // credentialsBinary = serializeCredentials(getContext().getCredentials()); - // credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); - // } else { - // credentialsBinary = credentialsBinary.duplicate(); - // } - // builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); - Credentials credentials = new Credentials(); - TokenCache.setSessionToken(token, credentials); - ByteBuffer credentialsBinary = serializeCredentials(credentials); - builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); - - - builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec)); - - FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder(); - runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis()); - runtimeInfo.setWithinDagPriority(0); - runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime()); - runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime()); - runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism()); - runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0); - - - builder.setUsingTezAm(false); - builder.setFragmentRuntimeInfo(runtimeInfo.build()); - return builder.build(); - } - - private ByteBuffer serializeCredentials(Credentials credentials) throws IOException { - Credentials containerCredentials = new Credentials(); - containerCredentials.addAll(credentials); - DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); - containerCredentials.writeTokenStorageToStream(containerTokens_dob); - return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); - } - - private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder { - protected LlapBaseRecordReader recordReader = null; - protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>(); - - public LlapRecordReaderTaskUmbilicalExternalResponder() { - } - - @Override - public void submissionFailed(String fragmentId, Throwable throwable) { - try { - sendOrQueueEvent(ReaderEvent.errorEvent( - "Received submission failed event for fragment ID " + fragmentId)); - } catch (Exception err) { - LOG.error("Error during heartbeat responder:", err); - } - } - - @Override - public void heartbeat(TezHeartbeatRequest request) { - TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); - List<TezEvent> inEvents = request.getEvents(); - for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { - EventType eventType = tezEvent.getEventType(); - try { - switch (eventType) { - case TASK_ATTEMPT_COMPLETED_EVENT: - sendOrQueueEvent(ReaderEvent.doneEvent()); - break; - case TASK_ATTEMPT_FAILED_EVENT: - TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent(); - sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics())); - break; - case TASK_STATUS_UPDATE_EVENT: - // If we want to handle counters - break; - default: - LOG.warn("Unhandled event type " + eventType); - break; - } - } catch (Exception err) { - LOG.error("Error during heartbeat responder:", err); - } - } - } - - @Override - public void taskKilled(TezTaskAttemptID taskAttemptId) { - try { - sendOrQueueEvent(ReaderEvent.errorEvent( - "Received task killed event for task ID " + taskAttemptId)); - } catch (Exception err) { - LOG.error("Error during heartbeat responder:", err); - } - } - - @Override - public void heartbeatTimeout(String taskAttemptId) { - try { - sendOrQueueEvent(ReaderEvent.errorEvent( - "Timed out waiting for heartbeat for task ID " + taskAttemptId)); - } catch (Exception err) { - LOG.error("Error during heartbeat responder:", err); - } - } - - public synchronized LlapBaseRecordReader getRecordReader() { - return recordReader; - } - - public synchronized void setRecordReader(LlapBaseRecordReader recordReader) { - this.recordReader = recordReader; - - if (recordReader == null) { - return; - } - - // If any events were queued by the responder, give them to the record reader now. - while (!queuedEvents.isEmpty()) { - ReaderEvent readerEvent = queuedEvents.poll(); - LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType()); - recordReader.handleEvent(readerEvent); - } - } - - /** - * Send the ReaderEvents to the record reader, if it is registered to this responder. - * If there is no registered record reader, add them to a list of pending reader events - * since we don't want to drop these events. - * @param readerEvent - */ - protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) { - LlapBaseRecordReader recordReader = getRecordReader(); - if (recordReader != null) { - recordReader.handleEvent(readerEvent); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType() - + " with message " + readerEvent.getMessage()); - } - - try { - queuedEvents.put(readerEvent); - } catch (Exception err) { - throw new RuntimeException("Unexpected exception while queueing reader event", err); - } - } - } - - /** - * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader. - */ - public void clearQueuedEvents() { - queuedEvents.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/llap-ext-client/pom.xml ---------------------------------------------------------------------- diff --git a/llap-ext-client/pom.xml b/llap-ext-client/pom.xml index 5a7e385..fdf16cd 100644 --- a/llap-ext-client/pom.xml +++ b/llap-ext-client/pom.xml @@ -74,6 +74,39 @@ <version>${hadoop.version}</version> <optional>true</optional> </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-api</artifactId> + <version>${tez.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-runtime-internals</artifactId> + <version>${tez.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- test inter-project --> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 61eb2ea..10d14c0 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.llap; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; import java.sql.SQLException; import java.sql.Connection; @@ -30,8 +32,28 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.DataInputStream; import java.io.ByteArrayInputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; + +import org.apache.commons.collections4.ListUtils; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapBaseRecordReader; +import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent; +import org.apache.hadoop.hive.llap.LlapInputSplit; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient; +import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.NullWritable; @@ -44,16 +66,39 @@ import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Progressable; -import org.apache.hive.llap.ext.LlapInputSplit; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; + +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; +import org.apache.tez.runtime.api.impl.EventType; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; + public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> { + private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class); + private static String driverName = "org.apache.hive.jdbc.HiveDriver"; private String url; // "jdbc:hive2://localhost:10000/default" private String user; // "hive", @@ -82,8 +127,58 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF @Override public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + LlapInputSplit llapSplit = (LlapInputSplit) split; - return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter); + + // Set conf to use LLAP user rather than current user for LLAP Zk registry. + HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser()); + SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes()); + + ServiceInstance serviceInstance = getServiceInstance(job, llapSplit); + String host = serviceInstance.getHost(); + int llapSubmitPort = serviceInstance.getRpcPort(); + + LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort + + " and outputformat port " + serviceInstance.getOutputFormatPort()); + + LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder = + new LlapRecordReaderTaskUmbilicalExternalResponder(); + LlapTaskUmbilicalExternalClient llapClient = + new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), + submitWorkInfo.getToken(), umbilicalResponder); + llapClient.init(job); + llapClient.start(); + + SubmitWorkRequestProto submitWorkRequestProto = + constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(), + llapClient.getAddress(), submitWorkInfo.getToken()); + + TezEvent tezEvent = new TezEvent(); + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length); + tezEvent.readFields(dib); + List<TezEvent> tezEventList = Lists.newArrayList(); + tezEventList.add(tezEvent); + + llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList); + + String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum(); + + HiveConf conf = new HiveConf(); + Socket socket = new Socket(host, + serviceInstance.getOutputFormatPort()); + + LOG.debug("Socket connected"); + + socket.getOutputStream().write(id.getBytes()); + socket.getOutputStream().write(0); + socket.getOutputStream().flush(); + + LOG.info("Registered id: " + id); + + LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class); + umbilicalResponder.setRecordReader(recordReader); + return recordReader; } @Override @@ -112,10 +207,10 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF ResultSet res = stmt.executeQuery(sql); while (res.next()) { // deserialize split - DataInput in = new DataInputStream(res.getBinaryStream(3)); - InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance(); + DataInput in = new DataInputStream(res.getBinaryStream(1)); + InputSplitWithLocationInfo is = new LlapInputSplit(); is.readFields(in); - ins.add(new LlapInputSplit(is, res.getString(1))); + ins.add(is); } res.close(); @@ -133,4 +228,250 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF // ignore } } + + private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException { + LlapRegistryService registryService = LlapRegistryService.getClient(job); + String host = llapSplit.getLocations()[0]; + + ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host); + if (serviceInstance == null) { + throw new IOException("No service instances found for " + host + " in registry"); + } + + return serviceInstance; + } + + private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException { + InetAddress address = InetAddress.getByName(host); + ServiceInstanceSet instanceSet = registryService.getInstances(); + ServiceInstance serviceInstance = null; + + // The name used in the service registry may not match the host name we're using. + // Try hostname/canonical hostname/host address + + String name = address.getHostName(); + LOG.info("Searching service instance by hostname " + name); + serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); + if (serviceInstance != null) { + return serviceInstance; + } + + name = address.getCanonicalHostName(); + LOG.info("Searching service instance by canonical hostname " + name); + serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); + if (serviceInstance != null) { + return serviceInstance; + } + + name = address.getHostAddress(); + LOG.info("Searching service instance by address " + name); + serviceInstance = selectServiceInstance(instanceSet.getByHost(name)); + if (serviceInstance != null) { + return serviceInstance; + } + + return serviceInstance; + } + + private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) { + if (serviceInstances == null || serviceInstances.isEmpty()) { + return null; + } + + // Get the first live service instance + for (ServiceInstance serviceInstance : serviceInstances) { + if (serviceInstance.isAlive()) { + return serviceInstance; + } + } + + LOG.info("No live service instances were found"); + return null; + } + + private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo, + int taskNum, + InetSocketAddress address, + Token<JobTokenIdentifier> token) throws + IOException { + TaskSpec taskSpec = submitWorkInfo.getTaskSpec(); + ApplicationId appId = submitWorkInfo.getFakeAppId(); + + SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder(); + // This works, assuming the executor is running within YARN. + LOG.info("Setting user in submitWorkRequest to: " + + System.getenv(ApplicationConstants.Environment.USER.name())); + builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name())); + builder.setApplicationIdString(appId.toString()); + builder.setAppAttemptNumber(0); + builder.setTokenIdentifier(appId.toString()); + + ContainerId containerId = + ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum); + builder.setContainerIdString(containerId.toString()); + + builder.setAmHost(address.getHostName()); + builder.setAmPort(address.getPort()); + Credentials taskCredentials = new Credentials(); + // Credentials can change across DAGs. Ideally construct only once per DAG. + // TODO Figure out where credentials will come from. Normally Hive sets up + // URLs on the tez dag, for which Tez acquires credentials. + + // taskCredentials.addAll(getContext().getCredentials()); + + // Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() == + // taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); + // ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); + // if (credentialsBinary == null) { + // credentialsBinary = serializeCredentials(getContext().getCredentials()); + // credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); + // } else { + // credentialsBinary = credentialsBinary.duplicate(); + // } + // builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); + Credentials credentials = new Credentials(); + TokenCache.setSessionToken(token, credentials); + ByteBuffer credentialsBinary = serializeCredentials(credentials); + builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); + + + builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec)); + + FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder(); + runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis()); + runtimeInfo.setWithinDagPriority(0); + runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime()); + runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime()); + runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism()); + runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0); + + + builder.setUsingTezAm(false); + builder.setFragmentRuntimeInfo(runtimeInfo.build()); + return builder.build(); + } + + private ByteBuffer serializeCredentials(Credentials credentials) throws IOException { + Credentials containerCredentials = new Credentials(); + containerCredentials.addAll(credentials); + DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); + containerCredentials.writeTokenStorageToStream(containerTokens_dob); + return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); + } + + private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder { + protected LlapBaseRecordReader recordReader = null; + protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>(); + + public LlapRecordReaderTaskUmbilicalExternalResponder() { + } + + @Override + public void submissionFailed(String fragmentId, Throwable throwable) { + try { + sendOrQueueEvent(ReaderEvent.errorEvent( + "Received submission failed event for fragment ID " + fragmentId)); + } catch (Exception err) { + LOG.error("Error during heartbeat responder:", err); + } + } + + @Override + public void heartbeat(TezHeartbeatRequest request) { + TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID(); + List<TezEvent> inEvents = request.getEvents(); + for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { + EventType eventType = tezEvent.getEventType(); + try { + switch (eventType) { + case TASK_ATTEMPT_COMPLETED_EVENT: + sendOrQueueEvent(ReaderEvent.doneEvent()); + break; + case TASK_ATTEMPT_FAILED_EVENT: + TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent(); + sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics())); + break; + case TASK_STATUS_UPDATE_EVENT: + // If we want to handle counters + break; + default: + LOG.warn("Unhandled event type " + eventType); + break; + } + } catch (Exception err) { + LOG.error("Error during heartbeat responder:", err); + } + } + } + + @Override + public void taskKilled(TezTaskAttemptID taskAttemptId) { + try { + sendOrQueueEvent(ReaderEvent.errorEvent( + "Received task killed event for task ID " + taskAttemptId)); + } catch (Exception err) { + LOG.error("Error during heartbeat responder:", err); + } + } + + @Override + public void heartbeatTimeout(String taskAttemptId) { + try { + sendOrQueueEvent(ReaderEvent.errorEvent( + "Timed out waiting for heartbeat for task ID " + taskAttemptId)); + } catch (Exception err) { + LOG.error("Error during heartbeat responder:", err); + } + } + + public synchronized LlapBaseRecordReader getRecordReader() { + return recordReader; + } + + public synchronized void setRecordReader(LlapBaseRecordReader recordReader) { + this.recordReader = recordReader; + + if (recordReader == null) { + return; + } + + // If any events were queued by the responder, give them to the record reader now. + while (!queuedEvents.isEmpty()) { + ReaderEvent readerEvent = queuedEvents.poll(); + LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType()); + recordReader.handleEvent(readerEvent); + } + } + + /** + * Send the ReaderEvents to the record reader, if it is registered to this responder. + * If there is no registered record reader, add them to a list of pending reader events + * since we don't want to drop these events. + * @param readerEvent + */ + protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) { + LlapBaseRecordReader recordReader = getRecordReader(); + if (recordReader != null) { + recordReader.handleEvent(readerEvent); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType() + + " with message " + readerEvent.getMessage()); + } + + try { + queuedEvents.put(readerEvent); + } catch (Exception err) { + throw new RuntimeException("Unexpected exception while queueing reader event", err); + } + } + } + + /** + * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader. + */ + public void clearQueuedEvents() { + queuedEvents.clear(); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java index 6ecb0f9..56ad555 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java @@ -3,6 +3,7 @@ package org.apache.hadoop.hive.llap; import java.io.IOException; import org.apache.hadoop.hive.llap.LlapBaseRecordReader; +import org.apache.hadoop.hive.llap.LlapInputSplit; import org.apache.hadoop.hive.llap.LlapRowRecordReader; import org.apache.hadoop.hive.llap.Row; import org.apache.hadoop.hive.llap.Schema; @@ -15,7 +16,6 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hive.llap.ext.LlapInputSplit; public class LlapRowInputFormat implements InputFormat<NullWritable, Row> { @@ -29,7 +29,7 @@ public class LlapRowInputFormat implements InputFormat<NullWritable, Row> { @Override public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { - LlapInputSplit<Text> llapSplit = (LlapInputSplit<Text>) split; + LlapInputSplit llapSplit = (LlapInputSplit) split; LlapBaseRecordReader<Text> reader = (LlapBaseRecordReader<Text>) baseInputFormat.getRecordReader(llapSplit, job, reporter); return new LlapRowRecordReader(job, reader.getSchema(), reader); } http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java b/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java deleted file mode 100644 index d8881c4..0000000 --- a/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.apache.hive.llap.ext; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.InputSplitWithLocationInfo; -import org.apache.hadoop.mapred.SplitLocationInfo; - - -public class LlapInputSplit<V extends WritableComparable> implements InputSplitWithLocationInfo { - InputSplitWithLocationInfo nativeSplit; - String inputFormatClassName; - - public LlapInputSplit() { - } - - public LlapInputSplit(InputSplitWithLocationInfo nativeSplit, String inputFormatClassName) { - this.nativeSplit = nativeSplit; - this.inputFormatClassName = inputFormatClassName; - } - - @Override - public long getLength() throws IOException { - return nativeSplit.getLength(); - } - - @Override - public String[] getLocations() throws IOException { - return nativeSplit.getLocations(); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(inputFormatClassName); - out.writeUTF(nativeSplit.getClass().getName()); - nativeSplit.write(out); - } - - @Override - public void readFields(DataInput in) throws IOException { - inputFormatClassName = in.readUTF(); - String splitClass = in.readUTF(); - try { - nativeSplit = (InputSplitWithLocationInfo)Class.forName(splitClass).newInstance(); - } catch (Exception e) { - throw new IOException(e); - } - nativeSplit.readFields(in); - } - - @Override - public SplitLocationInfo[] getLocationInfo() throws IOException { - return nativeSplit.getLocationInfo(); - } - - public InputSplit getSplit() { - return nativeSplit; - } - - public InputFormat<NullWritable, V> getInputFormat() throws IOException { - try { - return (InputFormat<NullWritable, V>) Class.forName(inputFormatClassName) - .newInstance(); - } catch(Exception e) { - throw new IOException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index 54bd830..6b25ce1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -447,7 +447,6 @@ public final class FunctionRegistry { system.registerGenericUDTF("posexplode", GenericUDTFPosExplode.class); system.registerGenericUDTF("stack", GenericUDTFStack.class); system.registerGenericUDTF("get_splits", GenericUDTFGetSplits.class); - system.registerGenericUDTF("execute_splits", GenericUDTFExecuteSplits.class); //PTF declarations system.registerGenericUDF(LEAD_FUNC_NAME, GenericUDFLead.class); http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java deleted file mode 100644 index 12759ab..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.udf.generic; - -import java.util.Arrays; -import java.util.List; - -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.UDFArgumentException; -import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; -import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.udf.UDFType; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFGetSplits.PlanFragment; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.RecordReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * GenericUDTFExecuteSplits. - * - */ -@Description(name = "execute_splits", value = "_FUNC_(string,int) - " - + "Returns an array of length int serialized splits for the referenced tables string.") -@UDFType(deterministic = false) -public class GenericUDTFExecuteSplits extends GenericUDTFGetSplits { - - private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFExecuteSplits.class); - - @Override - public StructObjectInspector initialize(ObjectInspector[] arguments) - throws UDFArgumentException { - - LOG.debug("initializing ExecuteSplits"); - - if (SessionState.get() == null || SessionState.get().getConf() == null) { - throw new IllegalStateException("Cannot run execute splits outside HS2"); - } - - if (arguments.length != 2) { - throw new UDFArgumentLengthException("The function execute_splits accepts 2 arguments."); - } else if (!(arguments[0] instanceof StringObjectInspector)) { - LOG.error("Got "+arguments[0].getTypeName()+" instead of string."); - throw new UDFArgumentTypeException(0, "\"" - + "string\" is expected at function execute_splits, " + "but \"" - + arguments[0].getTypeName() + "\" is found"); - } else if (!(arguments[1] instanceof IntObjectInspector)) { - LOG.error("Got "+arguments[1].getTypeName()+" instead of int."); - throw new UDFArgumentTypeException(1, "\"" - + "int\" is expected at function execute_splits, " + "but \"" - + arguments[1].getTypeName() + "\" is found"); - } - - stringOI = (StringObjectInspector) arguments[0]; - intOI = (IntObjectInspector) arguments[1]; - - List<String> names = Arrays.asList("split_num","value"); - List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList( - PrimitiveObjectInspectorFactory.javaIntObjectInspector, - PrimitiveObjectInspectorFactory.javaStringObjectInspector); - StructObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs); - - LOG.debug("done initializing GenericUDTFExecuteSplits"); - return outputOI; - } - - @Override - public void process(Object[] arguments) throws HiveException { - - String query = stringOI.getPrimitiveJavaObject(arguments[0]); - int num = intOI.get(arguments[1]); - - PlanFragment fragment = createPlanFragment(query, num); - try { - InputFormat<NullWritable, Text> format = (InputFormat<NullWritable,Text>)(Class.forName("org.apache.hadoop.hive.llap.LlapInputFormat").newInstance()); - int index = 0; - for (InputSplit s: getSplits(jc, num, fragment.work, fragment.schema)) { - RecordReader<NullWritable, Text> reader = format.getRecordReader(s,fragment.jc,null); - Text value = reader.createValue(); - NullWritable key = reader.createKey(); - index++; - while(reader.next(key,value)) { - Object[] os = new Object[2]; - os[0] = index; - os[1] = value.toString(); - forward(os); - } - } - } catch(Exception e) { - throw new HiveException(e); - } - } - - @Override - public void close() throws HiveException { - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 51027a7..9a52c7d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -150,10 +150,8 @@ public class GenericUDTFGetSplits extends GenericUDTF { stringOI = (StringObjectInspector) arguments[0]; intOI = (IntObjectInspector) arguments[1]; - List<String> names = Arrays.asList("if_class","split_class","split"); + List<String> names = Arrays.asList("split"); List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList( - PrimitiveObjectInspectorFactory.javaStringObjectInspector, - PrimitiveObjectInspectorFactory.javaStringObjectInspector, PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector); StructObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs); @@ -185,13 +183,11 @@ public class GenericUDTFGetSplits extends GenericUDTF { try { for (InputSplit s: getSplits(jc, num, tezWork, schema)) { - Object[] os = new Object[3]; - os[0] = LLAP_INTERNAL_INPUT_FORMAT_NAME; - os[1] = s.getClass().getName(); + Object[] os = new Object[1]; bos.reset(); s.write(dos); byte[] frozen = bos.toByteArray(); - os[2] = frozen; + os[0] = frozen; forward(os); } } catch(Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/ql/src/test/queries/clientpositive/udtf_get_splits.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/udtf_get_splits.q b/ql/src/test/queries/clientpositive/udtf_get_splits.q deleted file mode 100644 index f378dca..0000000 --- a/ql/src/test/queries/clientpositive/udtf_get_splits.q +++ /dev/null @@ -1,43 +0,0 @@ -set hive.fetch.task.conversion=more; -set hive.mapred.mode=nonstrict; -set mapred.max.split.size=100; -set mapred.min.split.size.per.node=100; -set mapred.min.split.size.per.rack=100; -set mapred.max.split.size=100; -set tez.grouping.max-size=100; -set tez.grouping.min-size=100; - -DESCRIBE FUNCTION get_splits; -DESCRIBE FUNCTION execute_splits; - -select r1, r2, floor(length(r3)/100000) -from - (select - get_splits( - "select key, count(*) from srcpart where key % 2 = 0 group by key", - 5) as (r1, r2, r3)) t; - -select r1, r2, floor(length(r3)/100000) -from - (select - get_splits( - "select key from srcpart where key % 2 = 0", - 5) as (r1, r2, r3)) t; - -show tables; - -select r1, r2 -from - (select - execute_splits( - "select key from srcpart where key % 2 = 0", - 1) as (r1, r2)) t; - -select r1, r2 -from - (select - execute_splits( - "select key from srcpart where key % 2 = 0", - 5) as (r1, r2)) t; - -select count(*) from (select key from srcpart where key % 2 = 0) t; http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/ql/src/test/results/clientpositive/llap/udtf_get_splits.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/udtf_get_splits.q.out b/ql/src/test/results/clientpositive/llap/udtf_get_splits.q.out deleted file mode 100644 index 2f17a91..0000000 --- a/ql/src/test/results/clientpositive/llap/udtf_get_splits.q.out +++ /dev/null @@ -1,2130 +0,0 @@ -PREHOOK: query: DESCRIBE FUNCTION get_splits -PREHOOK: type: DESCFUNCTION -POSTHOOK: query: DESCRIBE FUNCTION get_splits -POSTHOOK: type: DESCFUNCTION -get_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string. -PREHOOK: query: DESCRIBE FUNCTION execute_splits -PREHOOK: type: DESCFUNCTION -POSTHOOK: query: DESCRIBE FUNCTION execute_splits -POSTHOOK: type: DESCFUNCTION -execute_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string. -PREHOOK: query: select r1, r2, floor(length(r3)/100000) -from - (select - get_splits( - "select key, count(*) from srcpart where key % 2 = 0 group by key", - 5) as (r1, r2, r3)) t -PREHOOK: type: QUERY -PREHOOK: Input: _dummy_database@_dummy_table -#### A masked pattern was here #### -POSTHOOK: query: select r1, r2, floor(length(r3)/100000) -from - (select - get_splits( - "select key, count(*) from srcpart where key % 2 = 0 group by key", - 5) as (r1, r2, r3)) t -POSTHOOK: type: QUERY -POSTHOOK: Input: _dummy_database@_dummy_table -#### A masked pattern was here #### -PREHOOK: query: select r1, r2, floor(length(r3)/100000) -from - (select - get_splits( - "select key, count(*) from srcpart where key % 2 = 0 group by key", - 5) as (r1, r2, r3)) t -PREHOOK: type: QUERY -PREHOOK: Input: default@srcpart -PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 -PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 -PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 -PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 -PREHOOK: Output: database:default -PREHOOK: Output: default@#### A masked pattern was here #### -POSTHOOK: query: select r1, r2, floor(length(r3)/100000) -from - (select - get_splits( - "select key, count(*) from srcpart where key % 2 = 0 group by key", - 5) as (r1, r2, r3)) t -POSTHOOK: type: QUERY -POSTHOOK: Input: default@srcpart -POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 -POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 -POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 -POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 -POSTHOOK: Output: database:default -POSTHOOK: Output: default@#### A masked pattern was here #### -org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1 -PREHOOK: query: select r1, r2, floor(length(r3)/100000) -from - (select - get_splits( - "select key from srcpart where key % 2 = 0", - 5) as (r1, r2, r3)) t -PREHOOK: type: QUERY -PREHOOK: Input: _dummy_database@_dummy_table -#### A masked pattern was here #### -POSTHOOK: query: select r1, r2, floor(length(r3)/100000) -from - (select - get_splits( - "select key from srcpart where key % 2 = 0", - 5) as (r1, r2, r3)) t -POSTHOOK: type: QUERY -POSTHOOK: Input: _dummy_database@_dummy_table -#### A masked pattern was here #### -org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1 -org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1 -org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1 -org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1 -PREHOOK: query: show tables -PREHOOK: type: SHOWTABLES -PREHOOK: Input: database:default -POSTHOOK: query: show tables -POSTHOOK: type: SHOWTABLES -POSTHOOK: Input: database:default -alltypesorc -cbo_t1 -cbo_t2 -cbo_t3 -lineitem -part -src -src1 -src_cbo -src_json -src_sequencefile -src_thrift -srcbucket -srcbucket2 -srcpart -#### A masked pattern was here #### -PREHOOK: query: select r1, r2 -from - (select - execute_splits( - "select key from srcpart where key % 2 = 0", - 1) as (r1, r2)) t -PREHOOK: type: QUERY -PREHOOK: Input: _dummy_database@_dummy_table -#### A masked pattern was here #### -POSTHOOK: query: select r1, r2 -from - (select - execute_splits( - "select key from srcpart where key % 2 = 0", - 1) as (r1, r2)) t -POSTHOOK: type: QUERY -POSTHOOK: Input: _dummy_database@_dummy_table -#### A masked pattern was here #### -1 238 -1 86 -1 278 -1 98 -1 484 -1 150 -1 224 -1 66 -1 128 -1 146 -1 406 -1 374 -1 152 -1 82 -1 166 -1 430 -1 252 -1 292 -1 338 -1 446 -1 394 -1 482 -1 174 -1 494 -1 466 -1 208 -1 174 -1 396 -1 162 -1 266 -1 342 -1 0 -1 128 -1 316 -1 302 -1 438 -1 170 -1 20 -1 378 -1 92 -1 72 -1 4 -1 280 -1 208 -1 356 -1 382 -1 498 -1 386 -1 192 -1 286 -1 176 -1 54 -1 138 -1 216 -1 430 -1 278 -1 176 -1 318 -1 332 -1 180 -1 284 -1 12 -1 230 -1 260 -1 404 -1 384 -1 272 -1 138 -1 84 -1 348 -1 466 -1 58 -1 8 -1 230 -1 208 -1 348 -1 24 -1 172 -1 42 -1 158 -1 496 -1 0 -1 322 -1 468 -1 454 -1 100 -1 298 -1 418 -1 96 -1 26 -1 230 -1 120 -1 404 -1 436 -1 156 -1 468 -1 308 -1 196 -1 288 -1 98 -1 282 -1 318 -1 318 -1 470 -1 316 -1 0 -1 490 -1 364 -1 118 -1 134 -1 282 -1 138 -1 238 -1 118 -1 72 -1 90 -1 10 -1 306 -1 224 -1 242 -1 392 -1 272 -1 242 -1 452 -1 226 -1 402 -1 396 -1 58 -1 336 -1 168 -1 34 -1 472 -1 322 -1 498 -1 160 -1 42 -1 430 -1 458 -1 78 -1 76 -1 492 -1 218 -1 228 -1 138 -1 30 -1 64 -1 468 -1 76 -1 74 -1 342 -1 230 -1 368 -1 296 -1 216 -1 344 -1 274 -1 116 -1 256 -1 70 -1 480 -1 288 -1 244 -1 438 -1 128 -1 432 -1 202 -1 316 -1 280 -1 2 -1 80 -1 44 -1 104 -1 466 -1 366 -1 406 -1 190 -1 406 -1 114 -1 258 -1 90 -1 262 -1 348 -1 424 -1 12 -1 396 -1 164 -1 454 -1 478 -1 298 -1 164 -1 424 -1 382 -1 70 -1 480 -1 24 -1 104 -1 70 -1 438 -1 414 -1 200 -1 360 -1 248 -1 444 -1 120 -1 230 -1 478 -1 178 -1 468 -1 310 -1 460 -1 480 -1 136 -1 172 -1 214 -1 462 -1 406 -1 454 -1 384 -1 256 -1 26 -1 134 -1 384 -1 18 -1 462 -1 492 -1 100 -1 298 -1 498 -1 146 -1 458 -1 362 -1 186 -1 348 -1 18 -1 344 -1 84 -1 28 -1 448 -1 152 -1 348 -1 194 -1 414 -1 222 -1 126 -1 90 -1 400 -1 200 -2 238 -2 86 -2 278 -2 98 -2 484 -2 150 -2 224 -2 66 -2 128 -2 146 -2 406 -2 374 -2 152 -2 82 -2 166 -2 430 -2 252 -2 292 -2 338 -2 446 -2 394 -2 482 -2 174 -2 494 -2 466 -2 208 -2 174 -2 396 -2 162 -2 266 -2 342 -2 0 -2 128 -2 316 -2 302 -2 438 -2 170 -2 20 -2 378 -2 92 -2 72 -2 4 -2 280 -2 208 -2 356 -2 382 -2 498 -2 386 -2 192 -2 286 -2 176 -2 54 -2 138 -2 216 -2 430 -2 278 -2 176 -2 318 -2 332 -2 180 -2 284 -2 12 -2 230 -2 260 -2 404 -2 384 -2 272 -2 138 -2 84 -2 348 -2 466 -2 58 -2 8 -2 230 -2 208 -2 348 -2 24 -2 172 -2 42 -2 158 -2 496 -2 0 -2 322 -2 468 -2 454 -2 100 -2 298 -2 418 -2 96 -2 26 -2 230 -2 120 -2 404 -2 436 -2 156 -2 468 -2 308 -2 196 -2 288 -2 98 -2 282 -2 318 -2 318 -2 470 -2 316 -2 0 -2 490 -2 364 -2 118 -2 134 -2 282 -2 138 -2 238 -2 118 -2 72 -2 90 -2 10 -2 306 -2 224 -2 242 -2 392 -2 272 -2 242 -2 452 -2 226 -2 402 -2 396 -2 58 -2 336 -2 168 -2 34 -2 472 -2 322 -2 498 -2 160 -2 42 -2 430 -2 458 -2 78 -2 76 -2 492 -2 218 -2 228 -2 138 -2 30 -2 64 -2 468 -2 76 -2 74 -2 342 -2 230 -2 368 -2 296 -2 216 -2 344 -2 274 -2 116 -2 256 -2 70 -2 480 -2 288 -2 244 -2 438 -2 128 -2 432 -2 202 -2 316 -2 280 -2 2 -2 80 -2 44 -2 104 -2 466 -2 366 -2 406 -2 190 -2 406 -2 114 -2 258 -2 90 -2 262 -2 348 -2 424 -2 12 -2 396 -2 164 -2 454 -2 478 -2 298 -2 164 -2 424 -2 382 -2 70 -2 480 -2 24 -2 104 -2 70 -2 438 -2 414 -2 200 -2 360 -2 248 -2 444 -2 120 -2 230 -2 478 -2 178 -2 468 -2 310 -2 460 -2 480 -2 136 -2 172 -2 214 -2 462 -2 406 -2 454 -2 384 -2 256 -2 26 -2 134 -2 384 -2 18 -2 462 -2 492 -2 100 -2 298 -2 498 -2 146 -2 458 -2 362 -2 186 -2 348 -2 18 -2 344 -2 84 -2 28 -2 448 -2 152 -2 348 -2 194 -2 414 -2 222 -2 126 -2 90 -2 400 -2 200 -3 238 -3 86 -3 278 -3 98 -3 484 -3 150 -3 224 -3 66 -3 128 -3 146 -3 406 -3 374 -3 152 -3 82 -3 166 -3 430 -3 252 -3 292 -3 338 -3 446 -3 394 -3 482 -3 174 -3 494 -3 466 -3 208 -3 174 -3 396 -3 162 -3 266 -3 342 -3 0 -3 128 -3 316 -3 302 -3 438 -3 170 -3 20 -3 378 -3 92 -3 72 -3 4 -3 280 -3 208 -3 356 -3 382 -3 498 -3 386 -3 192 -3 286 -3 176 -3 54 -3 138 -3 216 -3 430 -3 278 -3 176 -3 318 -3 332 -3 180 -3 284 -3 12 -3 230 -3 260 -3 404 -3 384 -3 272 -3 138 -3 84 -3 348 -3 466 -3 58 -3 8 -3 230 -3 208 -3 348 -3 24 -3 172 -3 42 -3 158 -3 496 -3 0 -3 322 -3 468 -3 454 -3 100 -3 298 -3 418 -3 96 -3 26 -3 230 -3 120 -3 404 -3 436 -3 156 -3 468 -3 308 -3 196 -3 288 -3 98 -3 282 -3 318 -3 318 -3 470 -3 316 -3 0 -3 490 -3 364 -3 118 -3 134 -3 282 -3 138 -3 238 -3 118 -3 72 -3 90 -3 10 -3 306 -3 224 -3 242 -3 392 -3 272 -3 242 -3 452 -3 226 -3 402 -3 396 -3 58 -3 336 -3 168 -3 34 -3 472 -3 322 -3 498 -3 160 -3 42 -3 430 -3 458 -3 78 -3 76 -3 492 -3 218 -3 228 -3 138 -3 30 -3 64 -3 468 -3 76 -3 74 -3 342 -3 230 -3 368 -3 296 -3 216 -3 344 -3 274 -3 116 -3 256 -3 70 -3 480 -3 288 -3 244 -3 438 -3 128 -3 432 -3 202 -3 316 -3 280 -3 2 -3 80 -3 44 -3 104 -3 466 -3 366 -3 406 -3 190 -3 406 -3 114 -3 258 -3 90 -3 262 -3 348 -3 424 -3 12 -3 396 -3 164 -3 454 -3 478 -3 298 -3 164 -3 424 -3 382 -3 70 -3 480 -3 24 -3 104 -3 70 -3 438 -3 414 -3 200 -3 360 -3 248 -3 444 -3 120 -3 230 -3 478 -3 178 -3 468 -3 310 -3 460 -3 480 -3 136 -3 172 -3 214 -3 462 -3 406 -3 454 -3 384 -3 256 -3 26 -3 134 -3 384 -3 18 -3 462 -3 492 -3 100 -3 298 -3 498 -3 146 -3 458 -3 362 -3 186 -3 348 -3 18 -3 344 -3 84 -3 28 -3 448 -3 152 -3 348 -3 194 -3 414 -3 222 -3 126 -3 90 -3 400 -3 200 -4 238 -4 86 -4 278 -4 98 -4 484 -4 150 -4 224 -4 66 -4 128 -4 146 -4 406 -4 374 -4 152 -4 82 -4 166 -4 430 -4 252 -4 292 -4 338 -4 446 -4 394 -4 482 -4 174 -4 494 -4 466 -4 208 -4 174 -4 396 -4 162 -4 266 -4 342 -4 0 -4 128 -4 316 -4 302 -4 438 -4 170 -4 20 -4 378 -4 92 -4 72 -4 4 -4 280 -4 208 -4 356 -4 382 -4 498 -4 386 -4 192 -4 286 -4 176 -4 54 -4 138 -4 216 -4 430 -4 278 -4 176 -4 318 -4 332 -4 180 -4 284 -4 12 -4 230 -4 260 -4 404 -4 384 -4 272 -4 138 -4 84 -4 348 -4 466 -4 58 -4 8 -4 230 -4 208 -4 348 -4 24 -4 172 -4 42 -4 158 -4 496 -4 0 -4 322 -4 468 -4 454 -4 100 -4 298 -4 418 -4 96 -4 26 -4 230 -4 120 -4 404 -4 436 -4 156 -4 468 -4 308 -4 196 -4 288 -4 98 -4 282 -4 318 -4 318 -4 470 -4 316 -4 0 -4 490 -4 364 -4 118 -4 134 -4 282 -4 138 -4 238 -4 118 -4 72 -4 90 -4 10 -4 306 -4 224 -4 242 -4 392 -4 272 -4 242 -4 452 -4 226 -4 402 -4 396 -4 58 -4 336 -4 168 -4 34 -4 472 -4 322 -4 498 -4 160 -4 42 -4 430 -4 458 -4 78 -4 76 -4 492 -4 218 -4 228 -4 138 -4 30 -4 64 -4 468 -4 76 -4 74 -4 342 -4 230 -4 368 -4 296 -4 216 -4 344 -4 274 -4 116 -4 256 -4 70 -4 480 -4 288 -4 244 -4 438 -4 128 -4 432 -4 202 -4 316 -4 280 -4 2 -4 80 -4 44 -4 104 -4 466 -4 366 -4 406 -4 190 -4 406 -4 114 -4 258 -4 90 -4 262 -4 348 -4 424 -4 12 -4 396 -4 164 -4 454 -4 478 -4 298 -4 164 -4 424 -4 382 -4 70 -4 480 -4 24 -4 104 -4 70 -4 438 -4 414 -4 200 -4 360 -4 248 -4 444 -4 120 -4 230 -4 478 -4 178 -4 468 -4 310 -4 460 -4 480 -4 136 -4 172 -4 214 -4 462 -4 406 -4 454 -4 384 -4 256 -4 26 -4 134 -4 384 -4 18 -4 462 -4 492 -4 100 -4 298 -4 498 -4 146 -4 458 -4 362 -4 186 -4 348 -4 18 -4 344 -4 84 -4 28 -4 448 -4 152 -4 348 -4 194 -4 414 -4 222 -4 126 -4 90 -4 400 -4 200 -PREHOOK: query: select r1, r2 -from - (select - execute_splits( - "select key from srcpart where key % 2 = 0", - 5) as (r1, r2)) t -PREHOOK: type: QUERY -PREHOOK: Input: _dummy_database@_dummy_table -#### A masked pattern was here #### -POSTHOOK: query: select r1, r2 -from - (select - execute_splits( - "select key from srcpart where key % 2 = 0", - 5) as (r1, r2)) t -POSTHOOK: type: QUERY -POSTHOOK: Input: _dummy_database@_dummy_table -#### A masked pattern was here #### -1 238 -1 86 -1 278 -1 98 -1 484 -1 150 -1 224 -1 66 -1 128 -1 146 -1 406 -1 374 -1 152 -1 82 -1 166 -1 430 -1 252 -1 292 -1 338 -1 446 -1 394 -1 482 -1 174 -1 494 -1 466 -1 208 -1 174 -1 396 -1 162 -1 266 -1 342 -1 0 -1 128 -1 316 -1 302 -1 438 -1 170 -1 20 -1 378 -1 92 -1 72 -1 4 -1 280 -1 208 -1 356 -1 382 -1 498 -1 386 -1 192 -1 286 -1 176 -1 54 -1 138 -1 216 -1 430 -1 278 -1 176 -1 318 -1 332 -1 180 -1 284 -1 12 -1 230 -1 260 -1 404 -1 384 -1 272 -1 138 -1 84 -1 348 -1 466 -1 58 -1 8 -1 230 -1 208 -1 348 -1 24 -1 172 -1 42 -1 158 -1 496 -1 0 -1 322 -1 468 -1 454 -1 100 -1 298 -1 418 -1 96 -1 26 -1 230 -1 120 -1 404 -1 436 -1 156 -1 468 -1 308 -1 196 -1 288 -1 98 -1 282 -1 318 -1 318 -1 470 -1 316 -1 0 -1 490 -1 364 -1 118 -1 134 -1 282 -1 138 -1 238 -1 118 -1 72 -1 90 -1 10 -1 306 -1 224 -1 242 -1 392 -1 272 -1 242 -1 452 -1 226 -1 402 -1 396 -1 58 -1 336 -1 168 -1 34 -1 472 -1 322 -1 498 -1 160 -1 42 -1 430 -1 458 -1 78 -1 76 -1 492 -1 218 -1 228 -1 138 -1 30 -1 64 -1 468 -1 76 -1 74 -1 342 -1 230 -1 368 -1 296 -1 216 -1 344 -1 274 -1 116 -1 256 -1 70 -1 480 -1 288 -1 244 -1 438 -1 128 -1 432 -1 202 -1 316 -1 280 -1 2 -1 80 -1 44 -1 104 -1 466 -1 366 -1 406 -1 190 -1 406 -1 114 -1 258 -1 90 -1 262 -1 348 -1 424 -1 12 -1 396 -1 164 -1 454 -1 478 -1 298 -1 164 -1 424 -1 382 -1 70 -1 480 -1 24 -1 104 -1 70 -1 438 -1 414 -1 200 -1 360 -1 248 -1 444 -1 120 -1 230 -1 478 -1 178 -1 468 -1 310 -1 460 -1 480 -1 136 -1 172 -1 214 -1 462 -1 406 -1 454 -1 384 -1 256 -1 26 -1 134 -1 384 -1 18 -1 462 -1 492 -1 100 -1 298 -1 498 -1 146 -1 458 -1 362 -1 186 -1 348 -1 18 -1 344 -1 84 -1 28 -1 448 -1 152 -1 348 -1 194 -1 414 -1 222 -1 126 -1 90 -1 400 -1 200 -2 238 -2 86 -2 278 -2 98 -2 484 -2 150 -2 224 -2 66 -2 128 -2 146 -2 406 -2 374 -2 152 -2 82 -2 166 -2 430 -2 252 -2 292 -2 338 -2 446 -2 394 -2 482 -2 174 -2 494 -2 466 -2 208 -2 174 -2 396 -2 162 -2 266 -2 342 -2 0 -2 128 -2 316 -2 302 -2 438 -2 170 -2 20 -2 378 -2 92 -2 72 -2 4 -2 280 -2 208 -2 356 -2 382 -2 498 -2 386 -2 192 -2 286 -2 176 -2 54 -2 138 -2 216 -2 430 -2 278 -2 176 -2 318 -2 332 -2 180 -2 284 -2 12 -2 230 -2 260 -2 404 -2 384 -2 272 -2 138 -2 84 -2 348 -2 466 -2 58 -2 8 -2 230 -2 208 -2 348 -2 24 -2 172 -2 42 -2 158 -2 496 -2 0 -2 322 -2 468 -2 454 -2 100 -2 298 -2 418 -2 96 -2 26 -2 230 -2 120 -2 404 -2 436 -2 156 -2 468 -2 308 -2 196 -2 288 -2 98 -2 282 -2 318 -2 318 -2 470 -2 316 -2 0 -2 490 -2 364 -2 118 -2 134 -2 282 -2 138 -2 238 -2 118 -2 72 -2 90 -2 10 -2 306 -2 224 -2 242 -2 392 -2 272 -2 242 -2 452 -2 226 -2 402 -2 396 -2 58 -2 336 -2 168 -2 34 -2 472 -2 322 -2 498 -2 160 -2 42 -2 430 -2 458 -2 78 -2 76 -2 492 -2 218 -2 228 -2 138 -2 30 -2 64 -2 468 -2 76 -2 74 -2 342 -2 230 -2 368 -2 296 -2 216 -2 344 -2 274 -2 116 -2 256 -2 70 -2 480 -2 288 -2 244 -2 438 -2 128 -2 432 -2 202 -2 316 -2 280 -2 2 -2 80 -2 44 -2 104 -2 466 -2 366 -2 406 -2 190 -2 406 -2 114 -2 258 -2 90 -2 262 -2 348 -2 424 -2 12 -2 396 -2 164 -2 454 -2 478 -2 298 -2 164 -2 424 -2 382 -2 70 -2 480 -2 24 -2 104 -2 70 -2 438 -2 414 -2 200 -2 360 -2 248 -2 444 -2 120 -2 230 -2 478 -2 178 -2 468 -2 310 -2 460 -2 480 -2 136 -2 172 -2 214 -2 462 -2 406 -2 454 -2 384 -2 256 -2 26 -2 134 -2 384 -2 18 -2 462 -2 492 -2 100 -2 298 -2 498 -2 146 -2 458 -2 362 -2 186 -2 348 -2 18 -2 344 -2 84 -2 28 -2 448 -2 152 -2 348 -2 194 -2 414 -2 222 -2 126 -2 90 -2 400 -2 200 -3 238 -3 86 -3 278 -3 98 -3 484 -3 150 -3 224 -3 66 -3 128 -3 146 -3 406 -3 374 -3 152 -3 82 -3 166 -3 430 -3 252 -3 292 -3 338 -3 446 -3 394 -3 482 -3 174 -3 494 -3 466 -3 208 -3 174 -3 396 -3 162 -3 266 -3 342 -3 0 -3 128 -3 316 -3 302 -3 438 -3 170 -3 20 -3 378 -3 92 -3 72 -3 4 -3 280 -3 208 -3 356 -3 382 -3 498 -3 386 -3 192 -3 286 -3 176 -3 54 -3 138 -3 216 -3 430 -3 278 -3 176 -3 318 -3 332 -3 180 -3 284 -3 12 -3 230 -3 260 -3 404 -3 384 -3 272 -3 138 -3 84 -3 348 -3 466 -3 58 -3 8 -3 230 -3 208 -3 348 -3 24 -3 172 -3 42 -3 158 -3 496 -3 0 -3 322 -3 468 -3 454 -3 100 -3 298 -3 418 -3 96 -3 26 -3 230 -3 120 -3 404 -3 436 -3 156 -3 468 -3 308 -3 196 -3 288 -3 98 -3 282 -3 318 -3 318 -3 470 -3 316 -3 0 -3 490 -3 364 -3 118 -3 134 -3 282 -3 138 -3 238 -3 118 -3 72 -3 90 -3 10 -3 306 -3 224 -3 242 -3 392 -3 272 -3 242 -3 452 -3 226 -3 402 -3 396 -3 58 -3 336 -3 168 -3 34 -3 472 -3 322 -3 498 -3 160 -3 42 -3 430 -3 458 -3 78 -3 76 -3 492 -3 218 -3 228 -3 138 -3 30 -3 64 -3 468 -3 76 -3 74 -3 342 -3 230 -3 368 -3 296 -3 216 -3 344 -3 274 -3 116 -3 256 -3 70 -3 480 -3 288 -3 244 -3 438 -3 128 -3 432 -3 202 -3 316 -3 280 -3 2 -3 80 -3 44 -3 104 -3 466 -3 366 -3 406 -3 190 -3 406 -3 114 -3 258 -3 90 -3 262 -3 348 -3 424 -3 12 -3 396 -3 164 -3 454 -3 478 -3 298 -3 164 -3 424 -3 382 -3 70 -3 480 -3 24 -3 104 -3 70 -3 438 -3 414 -3 200 -3 360 -3 248 -3 444 -3 120 -3 230 -3 478 -3 178 -3 468 -3 310 -3 460 -3 480 -3 136 -3 172 -3 214 -3 462 -3 406 -3 454 -3 384 -3 256 -3 26 -3 134 -3 384 -3 18 -3 462 -3 492 -3 100 -3 298 -3 498 -3 146 -3 458 -3 362 -3 186 -3 348 -3 18 -3 344 -3 84 -3 28 -3 448 -3 152 -3 348 -3 194 -3 414 -3 222 -3 126 -3 90 -3 400 -3 200 -4 238 -4 86 -4 278 -4 98 -4 484 -4 150 -4 224 -4 66 -4 128 -4 146 -4 406 -4 374 -4 152 -4 82 -4 166 -4 430 -4 252 -4 292 -4 338 -4 446 -4 394 -4 482 -4 174 -4 494 -4 466 -4 208 -4 174 -4 396 -4 162 -4 266 -4 342 -4 0 -4 128 -4 316 -4 302 -4 438 -4 170 -4 20 -4 378 -4 92 -4 72 -4 4 -4 280 -4 208 -4 356 -4 382 -4 498 -4 386 -4 192 -4 286 -4 176 -4 54 -4 138 -4 216 -4 430 -4 278 -4 176 -4 318 -4 332 -4 180 -4 284 -4 12 -4 230 -4 260 -4 404 -4 384 -4 272 -4 138 -4 84 -4 348 -4 466 -4 58 -4 8 -4 230 -4 208 -4 348 -4 24 -4 172 -4 42 -4 158 -4 496 -4 0 -4 322 -4 468 -4 454 -4 100 -4 298 -4 418 -4 96 -4 26 -4 230 -4 120 -4 404 -4 436 -4 156 -4 468 -4 308 -4 196 -4 288 -4 98 -4 282 -4 318 -4 318 -4 470 -4 316 -4 0 -4 490 -4 364 -4 118 -4 134 -4 282 -4 138 -4 238 -4 118 -4 72 -4 90 -4 10 -4 306 -4 224 -4 242 -4 392 -4 272 -4 242 -4 452 -4 226 -4 402 -4 396 -4 58 -4 336 -4 168 -4 34 -4 472 -4 322 -4 498 -4 160 -4 42 -4 430 -4 458 -4 78 -4 76 -4 492 -4 218 -4 228 -4 138 -4 30 -4 64 -4 468 -4 76 -4 74 -4 342 -4 230 -4 368 -4 296 -4 216 -4 344 -4 274 -4 116 -4 256 -4 70 -4 480 -4 288 -4 244 -4 438 -4 128 -4 432 -4 202 -4 316 -4 280 -4 2 -4 80 -4 44 -4 104 -4 466 -4 366 -4 406 -4 190 -4 406 -4 114 -4 258 -4 90 -4 262 -4 348 -4 424 -4 12 -4 396 -4 164 -4 454 -4 478 -4 298 -4 164 -4 424 -4 382 -4 70 -4 480 -4 24 -4 104 -4 70 -4 438 -4 414 -4 200 -4 360 -4 248 -4 444 -4 120 -4 230 -4 478 -4 178 -4 468 -4 310 -4 460 -4 480 -4 136 -4 172 -4 214 -4 462 -4 406 -4 454 -4 384 -4 256 -4 26 -4 134 -4 384 -4 18 -4 462 -4 492 -4 100 -4 298 -4 498 -4 146 -4 458 -4 362 -4 186 -4 348 -4 18 -4 344 -4 84 -4 28 -4 448 -4 152 -4 348 -4 194 -4 414 -4 222 -4 126 -4 90 -4 400 -4 200 -PREHOOK: query: select count(*) from (select key from srcpart where key % 2 = 0) t -PREHOOK: type: QUERY -PREHOOK: Input: default@srcpart -PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 -PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 -PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 -PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 -#### A masked pattern was here #### -POSTHOOK: query: select count(*) from (select key from srcpart where key % 2 = 0) t -POSTHOOK: type: QUERY -POSTHOOK: Input: default@srcpart -POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 -POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 -POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 -POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 -#### A masked pattern was here #### -988 http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out b/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out deleted file mode 100644 index c8ebe88..0000000 --- a/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out +++ /dev/null @@ -1,73 +0,0 @@ -PREHOOK: query: DESCRIBE FUNCTION get_splits -PREHOOK: type: DESCFUNCTION -POSTHOOK: query: DESCRIBE FUNCTION get_splits -POSTHOOK: type: DESCFUNCTION -get_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string. -PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits -PREHOOK: type: DESCFUNCTION -POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits -POSTHOOK: type: DESCFUNCTION -get_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string. -PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits -PREHOOK: type: QUERY -PREHOOK: Input: default@src -PREHOOK: Output: database:default -PREHOOK: Output: default@table_74b4da448d74412d8fc0da37a405efb3 -POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits -POSTHOOK: type: QUERY -POSTHOOK: Input: default@src -POSTHOOK: Output: database:default -POSTHOOK: Output: default@table_74b4da448d74412d8fc0da37a405efb3 -PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits -PREHOOK: type: QUERY -PREHOOK: Input: default@src -PREHOOK: Output: database:default -PREHOOK: Output: default@table_4c8d9e53ea514617a6a72158c9c843c5 -POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits -POSTHOOK: type: QUERY -POSTHOOK: Input: default@src -POSTHOOK: Output: database:default -POSTHOOK: Output: default@table_4c8d9e53ea514617a6a72158c9c843c5 -PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits -PREHOOK: type: QUERY -PREHOOK: Input: default@src -PREHOOK: Output: database:default -PREHOOK: Output: default@table_42dcc08d004e411f8038701980b491e3 -POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits -POSTHOOK: type: QUERY -POSTHOOK: Input: default@src -POSTHOOK: Output: database:default -POSTHOOK: Output: default@table_42dcc08d004e411f8038701980b491e3 -PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits -PREHOOK: type: QUERY -PREHOOK: Input: default@src -PREHOOK: Output: database:default -PREHOOK: Output: default@table_9ef6460ac4a24e4fab1ea09ad94b01e3 -POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits -POSTHOOK: type: QUERY -POSTHOOK: Input: default@src -POSTHOOK: Output: database:default -POSTHOOK: Output: default@table_9ef6460ac4a24e4fab1ea09ad94b01e3 -PREHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t -PREHOOK: type: QUERY -PREHOOK: Input: default@src -PREHOOK: Output: database:default -PREHOOK: Output: default@table_bf88922034334ab08a9abb6cf8aa546e -POSTHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t -POSTHOOK: type: QUERY -POSTHOOK: Input: default@src -POSTHOOK: Output: database:default -POSTHOOK: Output: default@table_bf88922034334ab08a9abb6cf8aa546e -PREHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t -PREHOOK: type: QUERY -PREHOOK: Input: _dummy_database@_dummy_table -#### A masked pattern was here #### -POSTHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t -POSTHOOK: type: QUERY -POSTHOOK: Input: _dummy_database@_dummy_table -#### A masked pattern was here #### -class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit -1434872849 218 -class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit 2107621793 218 -class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit -1988206222 218 -class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit 1357774915 218 -class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit 605302265 218
