As I said before, I think the configure() method of the original
HadoopOutputFormat should be called in the configure() method of the Flink
HadoopOutputFormatBase. Flink calls configure() before open() and
finalizeOnMaster(), so that should work.

Have you checked if that fixes your problem?
If yes, I'd suggest to open a PR with this fix.

Thanks, Fabian

2015-04-01 13:44 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> Any feedback about this?
>
> On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
> > Hi Flink devs,
> > this is my final report about the HBaseOutputFormat problem (with Flink
> > 0.8.1) and I hope you could suggest me the best way to make a PR:
> >
> > 1) The following code produce the error reported below (this should be
> > fixed in 0.9 right?)
> >       Job job = Job.getInstance();
> >   myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
> > *TableOutputFormat*<Text>(), job));
> >
> > org.apache.flink.api.common.functions.InvalidTypesException: Interfaces
> > and abstract classes are not valid types: class
> > org.apache.hadoop.hbase.client.Mutation
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
> > at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
> >
> > 2)  So I created a custom HBaseTableOutputFormat -*see at the end of the
> > mail-* (that is basically copied from to the HBase TableInputFormat) that
> >  sets correctly the "mapred.output.dir" param required by the
> > HadoopOutputFormatBase so I can make it work:
> >                 Job job = Job.getInstance();
> > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> > outputTableName);
> > HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>();
> > HadoopOutputFormat<Text, Put> outOF = new
> > HadoopOutputFormat<>(hbaseTOF, job);
> > myDataset.output(outOF);
> >
> > 3) However this does still not work unless you call setConf() of
> > Configurable subclasses in the HadoopOutputFormatBase:
> >
> > - in the* public void finalizeGlobal(int parallelism) throws IOException*
> >  method:
> > ....
> >                * if(this.mapreduceOutputFormat instanceof Configurable){*
> > *
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > * }*
> > this.fileOutputCommitter = new FileOutputCommitter(new
> > Path(this.configuration.get("mapred.output.dir")), taskContext);
> > ....
> > - In the* public void open(int taskNumber, int numTasks) throws
> > IOException*  method:
> > ....
> >
> >               *  if(this.mapreduceOutputFormat instanceof Configurable){*
> > *
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > * }*
> >  try {
> > this.context =
> > HadoopUtils.instantiateTaskAttemptContext(this.configuration,
> > taskAttemptID);
> > } catch (Exception e) {
> > throw new RuntimeException(e);
> > }
> > ....
> >
> > 4) Probably the modifications apported in point 3 should be applied both
> > for mapreduce and mapred packages..
> >
> > Thanks in advace,
> > Flavio
> >
> >
> >
> > -----------------------------------------------------------------------
> > this is the HadoopOutputFormatBase.java:
> > -----------------------------------------------------------------------
> > import java.io.IOException;
> >
> > import org.apache.commons.logging.Log;
> > import org.apache.commons.logging.LogFactory;
> > import org.apache.hadoop.classification.InterfaceAudience;
> > import org.apache.hadoop.classification.InterfaceStability;
> > import org.apache.hadoop.conf.Configurable;
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.hbase.HBaseConfiguration;
> > import org.apache.hadoop.hbase.HConstants;
> > import org.apache.hadoop.hbase.client.Delete;
> > import org.apache.hadoop.hbase.client.HTable;
> > import org.apache.hadoop.hbase.client.Put;
> > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
> > import org.apache.hadoop.hbase.util.FSUtils;
> > import org.apache.hadoop.hbase.zookeeper.ZKUtil;
> > import org.apache.hadoop.mapreduce.JobContext;
> > import org.apache.hadoop.mapreduce.OutputCommitter;
> > import org.apache.hadoop.mapreduce.OutputFormat;
> > import org.apache.hadoop.mapreduce.RecordWriter;
> > import org.apache.hadoop.mapreduce.TaskAttemptContext;
> >
> > /**
> >  * Convert Map/Reduce output and write it to an HBase table. The KEY is
> > ignored
> >  * while the output value <u>must</u> be either a {@link Put} or a
> >  * {@link Delete} instance.
> >  *
> >  * @param <KEY>  The type of the key. Ignored in this class.
> >  */
> > @InterfaceAudience.Public
> > @InterfaceStability.Stable
> > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, Put>*
> > implements Configurable {
> >
> >   private final Log LOG =
> LogFactory.getLog(HBaseTableOutputFormat.class);
> >
> >   /** Job parameter that specifies the output table. */
> >   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
> >
> >   /**
> >    * Optional job parameter to specify a peer cluster.
> >    * Used specifying remote cluster when copying between hbase clusters
> > (the
> >    * source is picked up from <code>hbase-site.xml</code>).
> >    * @see TableMapReduceUtil#initTableReducerJob(String, Class,
> > org.apache.hadoop.mapreduce.Job, Class, String, String, String)
> >    */
> >   public static final String QUORUM_ADDRESS =
> "hbase.mapred.output.quorum";
> >
> >   /** Optional job parameter to specify peer cluster's ZK client port */
> >   public static final String QUORUM_PORT =
> > "hbase.mapred.output.quorum.port";
> >
> >   /** Optional specification of the rs class name of the peer cluster */
> >   public static final String
> >       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
> >   /** Optional specification of the rs impl name of the peer cluster */
> >   public static final String
> >       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
> >
> >   /** The configuration. */
> >   private Configuration conf = null;
> >
> >   private HTable table;
> >
> >   /**
> >    * Writes the reducer output to an HBase table.
> >    *
> >    * @param <KEY>  The type of the key.
> >    */
> >   protected static class TableRecordWriter<KEY>
> >   *extends RecordWriter<KEY, Put> *{
> >
> >     /** The table to write to. */
> >     private HTable table;
> >
> >     /**
> >      * Instantiate a TableRecordWriter with the HBase HClient for
> writing.
> >      *
> >      * @param table  The table to write to.
> >      */
> >     public TableRecordWriter(HTable table) {
> >       this.table = table;
> >     }
> >
> >     /**
> >      * Closes the writer, in this case flush table commits.
> >      *
> >      * @param context  The context.
> >      * @throws IOException When closing the writer fails.
> >      * @see
> >
> org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
> >      */
> >     @Override
> >     public void close(TaskAttemptContext context)
> >     throws IOException {
> >       table.close();
> >     }
> >
> >     /**
> >      * Writes a key/value pair into the table.
> >      *
> >      * @param key  The key.
> >      * @param value  The value.
> >      * @throws IOException When writing fails.
> >      * @see
> > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
> > java.lang.Object)
> >      */
> >     @Override
> >     *public void write(KEY key, Put value)*
> > *    throws IOException {*
> > *      if (value instanceof Put) this.table.put(new Put((Put)value));*
> > *//      else if (value instanceof Delete) this.table.delete(new
> > Delete((Delete)value));*
> > *      else throw new IOException("Pass a Delete or a Put");*
> > *    }*
> >   }
> >
> >   /**
> >    * Creates a new record writer.
> >    *
> >    * @param context  The current task context.
> >    * @return The newly created writer instance.
> >    * @throws IOException When creating the writer fails.
> >    * @throws InterruptedException When the jobs is cancelled.
> >    * @see
> >
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> >    */
> >   @Override
> >   public RecordWriter<KEY, *Put*> getRecordWriter(
> >     TaskAttemptContext context)
> >   throws IOException, InterruptedException {
> >     return new TableRecordWriter<KEY>(this.table);
> >   }
> >
> >   /**
> >    * Checks if the output target exists.
> >    *
> >    * @param context  The current context.
> >    * @throws IOException When the check fails.
> >    * @throws InterruptedException When the job is aborted.
> >    * @see
> >
> org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
> >    */
> >   @Override
> >   public void checkOutputSpecs(JobContext context) throws IOException,
> >       InterruptedException {
> >     // TODO Check if the table exists?
> >
> >   }
> >
> >   /**
> >    * Returns the output committer.
> >    *
> >    * @param context  The current context.
> >    * @return The committer.
> >    * @throws IOException When creating the committer fails.
> >    * @throws InterruptedException When the job is aborted.
> >    * @see
> >
> org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> >    */
> >   @Override
> >   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
> >   throws IOException, InterruptedException {
> >     return new TableOutputCommitter();
> >   }
> >
> >   public Configuration getConf() {
> >     return conf;
> >   }
> >
> >   @Override
> >   public void setConf(Configuration otherConf) {
> >     this.conf = HBaseConfiguration.create(otherConf);
> >
> >     String tableName = this.conf.get(OUTPUT_TABLE);
> >     if(tableName == null || tableName.length() <= 0) {
> >       throw new IllegalArgumentException("Must specify table name");
> >     }
> >
> >     String address = this.conf.get(QUORUM_ADDRESS);
> >     int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
> >     String serverClass = this.conf.get(REGION_SERVER_CLASS);
> >     String serverImpl = this.conf.get(REGION_SERVER_IMPL);
> >
> >     try {
> >       if (address != null) {
> >         ZKUtil.applyClusterKeyToConf(this.conf, address);
> >       }
> >       if (serverClass != null) {
> >         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
> >       }
> >       if (zkClientPort != 0) {
> >         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
> >       }
> >       this.table = new HTable(this.conf, tableName);
> >       this.table.setAutoFlush(false, true);
> >     *  String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf),
> > this.table.getName()).toString();*
> > *      this.conf.set("mapred.output.dir", outDir);*
> > *      otherConf.set("mapred.output.dir", outDir);*
> >       LOG.info("Created table instance for "  + tableName);
> >     } catch(IOException e) {
> >       LOG.error(e);
> >       throw new RuntimeException(e);
> >     }
> >   }
> > }
> >
> >
>

Reply via email to