Any feedback about this?

On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Hi Flink devs,
> this is my final report about the HBaseOutputFormat problem (with Flink
> 0.8.1) and I hope you could suggest me the best way to make a PR:
>
> 1) The following code produce the error reported below (this should be
> fixed in 0.9 right?)
>       Job job = Job.getInstance();
>   myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
> *TableOutputFormat*<Text>(), job));
>
> org.apache.flink.api.common.functions.InvalidTypesException: Interfaces
> and abstract classes are not valid types: class
> org.apache.hadoop.hbase.client.Mutation
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
> at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
>
> 2)  So I created a custom HBaseTableOutputFormat -*see at the end of the
> mail-* (that is basically copied from to the HBase TableInputFormat) that
>  sets correctly the "mapred.output.dir" param required by the
> HadoopOutputFormatBase so I can make it work:
>                 Job job = Job.getInstance();
> job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> outputTableName);
> HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>();
> HadoopOutputFormat<Text, Put> outOF = new
> HadoopOutputFormat<>(hbaseTOF, job);
> myDataset.output(outOF);
>
> 3) However this does still not work unless you call setConf() of
> Configurable subclasses in the HadoopOutputFormatBase:
>
> - in the* public void finalizeGlobal(int parallelism) throws IOException*
>  method:
> ....
>                * if(this.mapreduceOutputFormat instanceof Configurable){*
> * ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> * }*
> this.fileOutputCommitter = new FileOutputCommitter(new
> Path(this.configuration.get("mapred.output.dir")), taskContext);
> ....
> - In the* public void open(int taskNumber, int numTasks) throws
> IOException*  method:
> ....
>
>               *  if(this.mapreduceOutputFormat instanceof Configurable){*
> * ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> * }*
>  try {
> this.context =
> HadoopUtils.instantiateTaskAttemptContext(this.configuration,
> taskAttemptID);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> ....
>
> 4) Probably the modifications apported in point 3 should be applied both
> for mapreduce and mapred packages..
>
> Thanks in advace,
> Flavio
>
>
>
> -----------------------------------------------------------------------
> this is the HadoopOutputFormatBase.java:
> -----------------------------------------------------------------------
> import java.io.IOException;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configurable;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.hbase.HBaseConfiguration;
> import org.apache.hadoop.hbase.HConstants;
> import org.apache.hadoop.hbase.client.Delete;
> import org.apache.hadoop.hbase.client.HTable;
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
> import org.apache.hadoop.hbase.util.FSUtils;
> import org.apache.hadoop.hbase.zookeeper.ZKUtil;
> import org.apache.hadoop.mapreduce.JobContext;
> import org.apache.hadoop.mapreduce.OutputCommitter;
> import org.apache.hadoop.mapreduce.OutputFormat;
> import org.apache.hadoop.mapreduce.RecordWriter;
> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>
> /**
>  * Convert Map/Reduce output and write it to an HBase table. The KEY is
> ignored
>  * while the output value <u>must</u> be either a {@link Put} or a
>  * {@link Delete} instance.
>  *
>  * @param <KEY>  The type of the key. Ignored in this class.
>  */
> @InterfaceAudience.Public
> @InterfaceStability.Stable
> public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, Put>*
> implements Configurable {
>
>   private final Log LOG = LogFactory.getLog(HBaseTableOutputFormat.class);
>
>   /** Job parameter that specifies the output table. */
>   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
>
>   /**
>    * Optional job parameter to specify a peer cluster.
>    * Used specifying remote cluster when copying between hbase clusters
> (the
>    * source is picked up from <code>hbase-site.xml</code>).
>    * @see TableMapReduceUtil#initTableReducerJob(String, Class,
> org.apache.hadoop.mapreduce.Job, Class, String, String, String)
>    */
>   public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";
>
>   /** Optional job parameter to specify peer cluster's ZK client port */
>   public static final String QUORUM_PORT =
> "hbase.mapred.output.quorum.port";
>
>   /** Optional specification of the rs class name of the peer cluster */
>   public static final String
>       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
>   /** Optional specification of the rs impl name of the peer cluster */
>   public static final String
>       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
>
>   /** The configuration. */
>   private Configuration conf = null;
>
>   private HTable table;
>
>   /**
>    * Writes the reducer output to an HBase table.
>    *
>    * @param <KEY>  The type of the key.
>    */
>   protected static class TableRecordWriter<KEY>
>   *extends RecordWriter<KEY, Put> *{
>
>     /** The table to write to. */
>     private HTable table;
>
>     /**
>      * Instantiate a TableRecordWriter with the HBase HClient for writing.
>      *
>      * @param table  The table to write to.
>      */
>     public TableRecordWriter(HTable table) {
>       this.table = table;
>     }
>
>     /**
>      * Closes the writer, in this case flush table commits.
>      *
>      * @param context  The context.
>      * @throws IOException When closing the writer fails.
>      * @see
> org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
>      */
>     @Override
>     public void close(TaskAttemptContext context)
>     throws IOException {
>       table.close();
>     }
>
>     /**
>      * Writes a key/value pair into the table.
>      *
>      * @param key  The key.
>      * @param value  The value.
>      * @throws IOException When writing fails.
>      * @see
> org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
> java.lang.Object)
>      */
>     @Override
>     *public void write(KEY key, Put value)*
> *    throws IOException {*
> *      if (value instanceof Put) this.table.put(new Put((Put)value));*
> *//      else if (value instanceof Delete) this.table.delete(new
> Delete((Delete)value));*
> *      else throw new IOException("Pass a Delete or a Put");*
> *    }*
>   }
>
>   /**
>    * Creates a new record writer.
>    *
>    * @param context  The current task context.
>    * @return The newly created writer instance.
>    * @throws IOException When creating the writer fails.
>    * @throws InterruptedException When the jobs is cancelled.
>    * @see
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
>    */
>   @Override
>   public RecordWriter<KEY, *Put*> getRecordWriter(
>     TaskAttemptContext context)
>   throws IOException, InterruptedException {
>     return new TableRecordWriter<KEY>(this.table);
>   }
>
>   /**
>    * Checks if the output target exists.
>    *
>    * @param context  The current context.
>    * @throws IOException When the check fails.
>    * @throws InterruptedException When the job is aborted.
>    * @see
> org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
>    */
>   @Override
>   public void checkOutputSpecs(JobContext context) throws IOException,
>       InterruptedException {
>     // TODO Check if the table exists?
>
>   }
>
>   /**
>    * Returns the output committer.
>    *
>    * @param context  The current context.
>    * @return The committer.
>    * @throws IOException When creating the committer fails.
>    * @throws InterruptedException When the job is aborted.
>    * @see
> org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
>    */
>   @Override
>   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
>   throws IOException, InterruptedException {
>     return new TableOutputCommitter();
>   }
>
>   public Configuration getConf() {
>     return conf;
>   }
>
>   @Override
>   public void setConf(Configuration otherConf) {
>     this.conf = HBaseConfiguration.create(otherConf);
>
>     String tableName = this.conf.get(OUTPUT_TABLE);
>     if(tableName == null || tableName.length() <= 0) {
>       throw new IllegalArgumentException("Must specify table name");
>     }
>
>     String address = this.conf.get(QUORUM_ADDRESS);
>     int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
>     String serverClass = this.conf.get(REGION_SERVER_CLASS);
>     String serverImpl = this.conf.get(REGION_SERVER_IMPL);
>
>     try {
>       if (address != null) {
>         ZKUtil.applyClusterKeyToConf(this.conf, address);
>       }
>       if (serverClass != null) {
>         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
>       }
>       if (zkClientPort != 0) {
>         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
>       }
>       this.table = new HTable(this.conf, tableName);
>       this.table.setAutoFlush(false, true);
>     *  String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf),
> this.table.getName()).toString();*
> *      this.conf.set("mapred.output.dir", outDir);*
> *      otherConf.set("mapred.output.dir", outDir);*
>       LOG.info("Created table instance for "  + tableName);
>     } catch(IOException e) {
>       LOG.error(e);
>       throw new RuntimeException(e);
>     }
>   }
> }
>
>

Reply via email to