Author: rohini Date: Wed Sep 26 17:53:05 2018 New Revision: 1842040 URL: http://svn.apache.org/viewvc?rev=1842040&view=rev Log: PIG-3038: Support for Credentials for UDF,Loader and Storer (satishsaley via rohini)
Added: pig/trunk/test/org/apache/pig/test/TestCredentials.java Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/EvalFunc.java pig/trunk/src/org/apache/pig/LoadFunc.java pig/trunk/src/org/apache/pig/StoreFuncInterface.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1842040&r1=1842039&r2=1842040&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Wed Sep 26 17:53:05 2018 @@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley IMPROVEMENTS +PIG-3038: Support for Credentials for UDF,Loader and Storer (satishsaley via rohini) + PIG-5358: Remove hive-contrib jar from lib directory (szita) PIG-5343: Upgrade developer build environment (nielsbasjes via szita) Modified: pig/trunk/src/org/apache/pig/EvalFunc.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/EvalFunc.java?rev=1842040&r1=1842039&r2=1842040&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/EvalFunc.java (original) +++ pig/trunk/src/org/apache/pig/EvalFunc.java Wed Sep 26 17:53:05 2018 @@ -20,6 +20,8 @@ package org.apache.pig; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable; import org.apache.pig.builtin.OutputSchema; @@ -382,4 +384,16 @@ public abstract class EvalFunc<T> { return null; } + /** + * Allows adding secrets or custom credentials that can be used to + * talk to external systems. For eg: keys to decrypt encrypted data, + * database passwords, hcatalog/hbase delegation tokens, etc. + * This will be called once on the front end before the job is submitted. + * The added credentials can be accessed in the backend + * via {@link org.apache.hadoop.security.UserGroupInformation#getCredentials()}. + * @param credentials Credentials object to which delegation tokens and secrets can be added + * @param conf + */ + public void addCredentials(Credentials credentials, Configuration conf){ + } } Modified: pig/trunk/src/org/apache/pig/LoadFunc.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadFunc.java?rev=1842040&r1=1842039&r2=1842040&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/LoadFunc.java (original) +++ pig/trunk/src/org/apache/pig/LoadFunc.java Wed Sep 26 17:53:05 2018 @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.Count import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.security.Credentials; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.LoadPushDown.RequiredFieldList; @@ -362,4 +363,16 @@ public abstract class LoadFunc { return null; } + /** + * Allows adding secrets or custom credentials that can be used to + * talk to external systems. For eg: keys to decrypt encrypted data, + * database passwords, hcatalog/hbase delegation tokens, etc. + * This will be called once on the front end before the job is submitted. + * The added credentials can be accessed in the backend + * via {@link org.apache.hadoop.security.UserGroupInformation#getCredentials()}. + * @param credentials Credentials object to which delegation tokens and secrets can be added + * @param conf + */ + public void addCredentials(Credentials credentials, Configuration conf) { + } } Modified: pig/trunk/src/org/apache/pig/StoreFuncInterface.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFuncInterface.java?rev=1842040&r1=1842039&r2=1842040&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/StoreFuncInterface.java (original) +++ pig/trunk/src/org/apache/pig/StoreFuncInterface.java Wed Sep 26 17:53:05 2018 @@ -19,11 +19,12 @@ package org.apache.pig; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; - +import org.apache.hadoop.security.Credentials; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.data.Tuple; @@ -152,4 +153,17 @@ public interface StoreFuncInterface { * any runtime job information. */ void cleanupOnSuccess(String location, Job job) throws IOException; + + /** + * Allows adding secrets or custom credentials that can be used to + * talk to external systems. For eg: keys to decrypt encrypted data, + * database passwords, hcatalog/hbase delegation tokens, etc. + * This will be called once on the front end before the job is submitted. + * The added credentials can be accessed in the backend + * via {@link org.apache.hadoop.security.UserGroupInformation#getCredentials()}. + * @param credentials Credentials object to which delegation tokens and secrets can be added + * @param conf + */ + default void addCredentials(Credentials credentials, Configuration conf) { + } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1842040&r1=1842039&r2=1842040&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Sep 26 17:53:05 2018 @@ -82,6 +82,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor; @@ -551,13 +552,20 @@ public class JobControlCompiler{ for (POLoad ld : lds) { LoadFunc lf = ld.getLoadFunc(); lf.setLocation(ld.getLFile().getFileName(), nwJob); - + lf.addCredentials(nwJob.getCredentials(), conf); ld.setParentPlan(null); //Store the inp filespecs inp.add(ld); } } + //Process the POUserFunc + List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(mro.mapPlan, POUserFunc.class); + userFuncs.addAll(PlanHelper.getPhysicalOperators(mro.reducePlan, POUserFunc.class)); + for (POUserFunc userFunc : userFuncs) { + userFunc.getFunc().addCredentials(nwJob.getCredentials(), conf); + } + if(!mro.reducePlan.isEmpty()){ log.info("Reduce phase detected, estimating # of required reducers."); adjustNumReducers(plan, mro, nwJob); @@ -774,6 +782,7 @@ public class JobControlCompiler{ osf.cleanupOutput(st, nwJob); } } + sFunc.addCredentials(nwJob.getCredentials(), conf); } for (POStore st : reduceStores) { @@ -786,6 +795,7 @@ public class JobControlCompiler{ osf.cleanupOutput(st, nwJob); } } + sFunc.addCredentials(nwJob.getCredentials(), conf); } setOutputFormat(nwJob); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1842040&r1=1842039&r2=1842040&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Wed Sep 26 17:53:05 2018 @@ -32,6 +32,7 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; @@ -68,6 +69,7 @@ public class POUserFunc extends Expressi private transient EvalFunc func; private transient List<String> cacheFiles = null; private transient List<String> shipFiles = null; + private transient Credentials creds = null; FuncSpec funcSpec; FuncSpec origFSpec; @@ -642,4 +644,13 @@ public class POUserFunc extends Expressi public boolean needEndOfAllInputProcessing() { return getFunc().needEndOfAllInputProcessing(); } + + public Credentials getCredentials() { + return this.creds; + } + + public void setCredentials(Credentials creds) { + this.creds = creds; + } + } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1842040&r1=1842039&r2=1842040&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Sep 26 17:53:05 2018 @@ -84,6 +84,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; @@ -360,7 +361,6 @@ public class TezDagBuilder extends TezOp public void visitTezOp(TezOperator tezOp) throws VisitorException { TezOperPlan tezPlan = getPlan(); List<TezOperator> predecessors = tezPlan.getPredecessors(tezOp); - // Construct vertex for the current Tez operator Vertex to = null; try { @@ -611,6 +611,7 @@ public class TezDagBuilder extends TezOp private Vertex newVertex(TezOperator tezOp) throws IOException, ClassNotFoundException, InterruptedException { + ProcessorDescriptor procDesc = ProcessorDescriptor.create( tezOp.getProcessorName()); @@ -642,6 +643,9 @@ public class TezDagBuilder extends TezOp // Process stores LinkedList<POStore> stores = processStores(tezOp, payloadConf, job); + // Process UserFuncs + processUserFuncs(tezOp, job); + Configuration inputPayLoad = null; Configuration outputPayLoad = null; @@ -1044,6 +1048,19 @@ public class TezDagBuilder extends TezOp return vertex; } + /** + * Process POUserFunc to add credentials + * @param tezOp + * @param job + * @throws VisitorException + */ + private void processUserFuncs(TezOperator tezOp, Job job) throws VisitorException { + List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class); + for (POUserFunc userFunc : userFuncs) { + userFunc.getFunc().addCredentials(job.getCredentials(), job.getConfiguration()); + } + } + private LinkedList<POStore> processStores(TezOperator tezOp, Configuration payloadConf, Job job) throws VisitorException, IOException { @@ -1057,6 +1074,7 @@ public class TezDagBuilder extends TezOp storeLocations.add(st); StoreFuncInterface sFunc = st.getStoreFunc(); sFunc.setStoreLocation(st.getSFile().getFileName(), job); + sFunc.addCredentials(job.getCredentials(), job.getConfiguration()); } Path tmpLocation = null; Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1842040&r1=1842039&r2=1842040&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Wed Sep 26 17:53:05 2018 @@ -100,7 +100,7 @@ public class LoaderProcessor extends Tez for (POLoad ld : lds) { LoadFunc lf = ld.getLoadFunc(); lf.setLocation(ld.getLFile().getFileName(), job); - + lf.addCredentials(this.jobConf.getCredentials(), conf); // Store the inp filespecs inp.add(ld.getLFile()); } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1842040&r1=1842039&r2=1842040&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Wed Sep 26 17:53:05 2018 @@ -17,7 +17,6 @@ package org.apache.pig.backend.hadoop.hbase; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.UndeclaredThrowableException; import java.math.BigDecimal; import java.math.BigInteger; @@ -78,6 +77,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.pig.CollectableLoadFunc; import org.apache.pig.LoadCaster; @@ -766,11 +766,6 @@ public class HBaseStorage extends LoadFu job.getConfiguration().setBoolean("pig.noSplitCombination", true); m_conf = initializeLocalJobConfig(job); - String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET); - if (delegationTokenSet == null) { - addHBaseDelegationToken(m_conf, job); - udfProps.setProperty(HBASE_TOKEN_SET, "true"); - } String tablename = location; if (location.startsWith("hbase://")) { @@ -830,6 +825,35 @@ public class HBaseStorage extends LoadFu return FuncUtils.getShipFiles(classList); } + + @Override + public void addCredentials(Credentials credentials, Configuration conf) { + JobConf jobConf = initializeLocalJobConfig(conf); + if ("kerberos".equalsIgnoreCase(jobConf.get(HBASE_SECURITY_CONF_KEY))) { + LOG.info("hbase is configured to use Kerberos, attempting to fetch delegation token."); + try { + User currentUser = User.getCurrent(); + UserGroupInformation currentUserGroupInformation = currentUser.getUGI(); + if (currentUserGroupInformation.hasKerberosCredentials()) { + try (Connection connection = ConnectionFactory.createConnection(jobConf, currentUser)) { + TokenUtil.obtainTokenForJob(connection, jobConf, currentUser); + LOG.info("Token retrieval succeeded for user " + currentUser.getName()); + credentials.addAll(jobConf.getCredentials()); + } + } else { + LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available for user " + currentUser.getName()); + } + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new UndeclaredThrowableException(e, + "Unexpected error calling TokenUtil.obtainTokenForJob()"); + } + } else { + LOG.info("hbase is not configured to use kerberos, skipping delegation token"); + } + } + private void addClassToList(String className, List<Class> classList) { try { Class klass = Class.forName(className); @@ -839,9 +863,8 @@ public class HBaseStorage extends LoadFu } } - private JobConf initializeLocalJobConfig(Job job) { + private JobConf initializeLocalJobConfig(Configuration jobConf) { Properties udfProps = getUDFProperties(); - Configuration jobConf = job.getConfiguration(); JobConf localConf = new JobConf(jobConf); if (udfProps.containsKey(HBASE_CONFIG_SET)) { for (Entry<Object, Object> entry : udfProps.entrySet()) { @@ -864,40 +887,8 @@ public class HBaseStorage extends LoadFu return localConf; } - /** - * Get delegation token from hbase and add it to the Job - * - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - private void addHBaseDelegationToken(Configuration hbaseConf, Job job) { - - if (!UDFContext.getUDFContext().isFrontend()) { - LOG.debug("skipping authentication checks because we're currently in a frontend UDF context"); - return; - } - - if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) { - LOG.info("hbase is configured to use Kerberos, attempting to fetch delegation token."); - try { - User currentUser = User.getCurrent(); - UserGroupInformation currentUserGroupInformation = currentUser.getUGI(); - if (currentUserGroupInformation.hasKerberosCredentials()) { - try (Connection connection = ConnectionFactory.createConnection(hbaseConf, currentUser)) { - TokenUtil.obtainTokenForJob(connection, currentUser, job); - LOG.info("Token retrieval succeeded for user " + currentUser.getName()); - } - } else { - LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available for user " + currentUser.getName()); - } - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - throw new UndeclaredThrowableException(e, - "Unexpected error calling TokenUtil.obtainTokenForJob()"); - } - } else { - LOG.info("hbase is not configured to use kerberos, skipping delegation token"); - } + private JobConf initializeLocalJobConfig(Job job) { + return initializeLocalJobConfig(job.getConfiguration()); } @Override @@ -1129,11 +1120,6 @@ public class HBaseStorage extends LoadFu } m_conf = initializeLocalJobConfig(job); - // Not setting a udf property and getting the hbase delegation token - // only once like in setLocation as setStoreLocation gets different Job - // objects for each call and the last Job passed is the one that is - // launched. So we end up getting multiple hbase delegation tokens. - addHBaseDelegationToken(m_conf, job); } @Override Added: pig/trunk/test/org/apache/pig/test/TestCredentials.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCredentials.java?rev=1842040&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestCredentials.java (added) +++ pig/trunk/test/org/apache/pig/test/TestCredentials.java Wed Sep 26 17:53:05 2018 @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Iterator; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.pig.EvalFunc; +import org.apache.pig.PigServer; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.test.utils.GenPhyOp; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class TestCredentials { + private static PigServer pigServer; + private static PigContext pc; + private static MiniGenericCluster cluster; + private static String INPUT_FILE = "input.txt"; + private static String OUTPUT_DIR = "output"; + private static final Text ALIAS = new Text ("testKey"); + private static final String SECRET = "dummySecret"; + + public static class CredentialsEvalFunc extends EvalFunc<String>{ + @Override + public String exec(Tuple input) throws IOException { + String val = new String(UserGroupInformation.getCurrentUser().getCredentials().getSecretKey(ALIAS)); + if(!SECRET.equals(val)) { + throw new IOException("Invalid secret"); + } + return val; + } + + @Override + public void addCredentials(Credentials credentials, Configuration conf) { + Credentials creds = new Credentials(); + creds.addSecretKey(ALIAS, SECRET.getBytes()); + credentials.addAll(creds); + } + } + + public static class CredPigStorage extends PigStorage { + @Override + public void addCredentials(Credentials credentials, Configuration conf) { + Credentials creds = new Credentials(); + creds.addSecretKey(ALIAS, SECRET.getBytes()); + credentials.addAll(creds); + } + + @Override + public Tuple getNext() throws IOException { + Tuple ret = super.getNext(); + if(ret == null) { + return ret; + } + byte[] b = UserGroupInformation.getCurrentUser().getCredentials().getSecretKey(ALIAS); + if(b != null) { + ret.append(new String(b)); + } + return ret; + } + + @Override + public void putNext(Tuple tuple) throws IOException { + if(tuple == null) { + return; + } + byte[] b = UserGroupInformation.getCurrentUser().getCredentials().getSecretKey(ALIAS); + if(b != null) { + tuple.append(new String(b)); + } + super.putNext(tuple); + } + } + + @BeforeClass + public static void setup() throws IOException { + cluster = MiniGenericCluster.buildCluster(); + pc = new PigContext(cluster.getExecType(), cluster.getProperties()); + pc.connect(); + pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + GenPhyOp.setPc(pc); + createInput(); + } + + @AfterClass + public static void tearDown() throws Exception { + deleteInput(); + if(pigServer!=null) { + pigServer.shutdown(); + } + cluster.shutDown(); + } + + @Test + public void testCredentialsEvalFunc() throws IOException { + Tuple expectedResult = (Tuple)Util.getPigConstant("('" + SECRET + "')"); + pigServer.registerQuery("a = load '"+ INPUT_FILE +"' as (i:chararray);"); + pigServer.registerQuery("d = foreach a generate " + CredentialsEvalFunc.class.getName() + "(i);"); + Iterator<Tuple> it = pigServer.openIterator("d"); + assertTrue(it.hasNext()); + assertEquals(expectedResult, it.next()); + assertFalse(it.hasNext()); + } + + @Test + public void testCredentialsLoadFunc() throws Exception { + Tuple expectedResult = (Tuple)Util.getPigConstant("('" + SECRET + "')"); + pigServer.registerQuery("a = load '" + INPUT_FILE + "' using " + CredPigStorage.class.getName() + + "() as (text:chararray, secstr:chararray);"); + pigServer.registerQuery("d = foreach a generate secstr;"); + Iterator<Tuple> it = pigServer.openIterator("d"); + assertTrue(it.hasNext()); + assertEquals(expectedResult, it.next()); + assertFalse(it.hasNext()); + } + + @Test + public void testCredentialsStoreFunc() throws Exception { + Tuple expectedResult = (Tuple)Util.getPigConstant("('" + SECRET + "')"); + pigServer.registerQuery("a = load '" + INPUT_FILE + "' using PigStorage() as (text:chararray);"); + pigServer.registerQuery("store a into '" + OUTPUT_DIR +"' using " + CredPigStorage.class.getName() + "();"); + pigServer.registerQuery("c = load '" + OUTPUT_DIR + "' using PigStorage() as (text:chararray, secstr:chararray);"); + pigServer.registerQuery("d = foreach c generate secstr;"); + Iterator<Tuple> it = pigServer.openIterator("d"); + assertTrue(it.hasNext()); + assertEquals(expectedResult, it.next()); + assertFalse(it.hasNext()); + } + + private static void createInput() throws IOException { + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("dumb"); + w.close(); + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + } + + private static void deleteInput() throws IOException { + new File(INPUT_FILE).delete(); + FileUtils.deleteDirectory(new File(OUTPUT_DIR)); + Util.deleteFile(cluster, INPUT_FILE); + Util.deleteFile(cluster, OUTPUT_DIR); + } + +}