Hi Flavio, Thanks for looking into this problem. Actually, it's a bit difficult to discuss your changes here because of the formatting/syntax highlighting and missing context of the classes. Usually, we do that in a pull request. Do you have a GitHub account? If so, push your changes to your forked Flink repository. GitHub will then offer you to create a pull request for your modified branch.
Let's discuss your changes on GitHub. Best, Max On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Any feedback about this? > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > > > Hi Flink devs, > > this is my final report about the HBaseOutputFormat problem (with Flink > > 0.8.1) and I hope you could suggest me the best way to make a PR: > > > > 1) The following code produce the error reported below (this should be > > fixed in 0.9 right?) > > Job job = Job.getInstance(); > > myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new > > *TableOutputFormat*<Text>(), job)); > > > > org.apache.flink.api.common.functions.InvalidTypesException: Interfaces > > and abstract classes are not valid types: class > > org.apache.hadoop.hbase.client.Mutation > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79) > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > > > > 2) So I created a custom HBaseTableOutputFormat -*see at the end of the > > mail-* (that is basically copied from to the HBase TableInputFormat) that > > sets correctly the "mapred.output.dir" param required by the > > HadoopOutputFormatBase so I can make it work: > > Job job = Job.getInstance(); > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > > outputTableName); > > HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>(); > > HadoopOutputFormat<Text, Put> outOF = new > > HadoopOutputFormat<>(hbaseTOF, job); > > myDataset.output(outOF); > > > > 3) However this does still not work unless you call setConf() of > > Configurable subclasses in the HadoopOutputFormatBase: > > > > - in the* public void finalizeGlobal(int parallelism) throws IOException* > > method: > > .... > > * if(this.mapreduceOutputFormat instanceof Configurable){* > > * > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > * }* > > this.fileOutputCommitter = new FileOutputCommitter(new > > Path(this.configuration.get("mapred.output.dir")), taskContext); > > .... > > - In the* public void open(int taskNumber, int numTasks) throws > > IOException* method: > > .... > > > > * if(this.mapreduceOutputFormat instanceof Configurable){* > > * > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > * }* > > try { > > this.context = > > HadoopUtils.instantiateTaskAttemptContext(this.configuration, > > taskAttemptID); > > } catch (Exception e) { > > throw new RuntimeException(e); > > } > > .... > > > > 4) Probably the modifications apported in point 3 should be applied both > > for mapreduce and mapred packages.. > > > > Thanks in advace, > > Flavio > > > > > > > > ----------------------------------------------------------------------- > > this is the HadoopOutputFormatBase.java: > > ----------------------------------------------------------------------- > > import java.io.IOException; > > > > import org.apache.commons.logging.Log; > > import org.apache.commons.logging.LogFactory; > > import org.apache.hadoop.classification.InterfaceAudience; > > import org.apache.hadoop.classification.InterfaceStability; > > import org.apache.hadoop.conf.Configurable; > > import org.apache.hadoop.conf.Configuration; > > import org.apache.hadoop.hbase.HBaseConfiguration; > > import org.apache.hadoop.hbase.HConstants; > > import org.apache.hadoop.hbase.client.Delete; > > import org.apache.hadoop.hbase.client.HTable; > > import org.apache.hadoop.hbase.client.Put; > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; > > import org.apache.hadoop.hbase.util.FSUtils; > > import org.apache.hadoop.hbase.zookeeper.ZKUtil; > > import org.apache.hadoop.mapreduce.JobContext; > > import org.apache.hadoop.mapreduce.OutputCommitter; > > import org.apache.hadoop.mapreduce.OutputFormat; > > import org.apache.hadoop.mapreduce.RecordWriter; > > import org.apache.hadoop.mapreduce.TaskAttemptContext; > > > > /** > > * Convert Map/Reduce output and write it to an HBase table. The KEY is > > ignored > > * while the output value <u>must</u> be either a {@link Put} or a > > * {@link Delete} instance. > > * > > * @param <KEY> The type of the key. Ignored in this class. > > */ > > @InterfaceAudience.Public > > @InterfaceStability.Stable > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, Put>* > > implements Configurable { > > > > private final Log LOG = > LogFactory.getLog(HBaseTableOutputFormat.class); > > > > /** Job parameter that specifies the output table. */ > > public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; > > > > /** > > * Optional job parameter to specify a peer cluster. > > * Used specifying remote cluster when copying between hbase clusters > > (the > > * source is picked up from <code>hbase-site.xml</code>). > > * @see TableMapReduceUtil#initTableReducerJob(String, Class, > > org.apache.hadoop.mapreduce.Job, Class, String, String, String) > > */ > > public static final String QUORUM_ADDRESS = > "hbase.mapred.output.quorum"; > > > > /** Optional job parameter to specify peer cluster's ZK client port */ > > public static final String QUORUM_PORT = > > "hbase.mapred.output.quorum.port"; > > > > /** Optional specification of the rs class name of the peer cluster */ > > public static final String > > REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; > > /** Optional specification of the rs impl name of the peer cluster */ > > public static final String > > REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; > > > > /** The configuration. */ > > private Configuration conf = null; > > > > private HTable table; > > > > /** > > * Writes the reducer output to an HBase table. > > * > > * @param <KEY> The type of the key. > > */ > > protected static class TableRecordWriter<KEY> > > *extends RecordWriter<KEY, Put> *{ > > > > /** The table to write to. */ > > private HTable table; > > > > /** > > * Instantiate a TableRecordWriter with the HBase HClient for > writing. > > * > > * @param table The table to write to. > > */ > > public TableRecordWriter(HTable table) { > > this.table = table; > > } > > > > /** > > * Closes the writer, in this case flush table commits. > > * > > * @param context The context. > > * @throws IOException When closing the writer fails. > > * @see > > > org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) > > */ > > @Override > > public void close(TaskAttemptContext context) > > throws IOException { > > table.close(); > > } > > > > /** > > * Writes a key/value pair into the table. > > * > > * @param key The key. > > * @param value The value. > > * @throws IOException When writing fails. > > * @see > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, > > java.lang.Object) > > */ > > @Override > > *public void write(KEY key, Put value)* > > * throws IOException {* > > * if (value instanceof Put) this.table.put(new Put((Put)value));* > > *// else if (value instanceof Delete) this.table.delete(new > > Delete((Delete)value));* > > * else throw new IOException("Pass a Delete or a Put");* > > * }* > > } > > > > /** > > * Creates a new record writer. > > * > > * @param context The current task context. > > * @return The newly created writer instance. > > * @throws IOException When creating the writer fails. > > * @throws InterruptedException When the jobs is cancelled. > > * @see > > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > */ > > @Override > > public RecordWriter<KEY, *Put*> getRecordWriter( > > TaskAttemptContext context) > > throws IOException, InterruptedException { > > return new TableRecordWriter<KEY>(this.table); > > } > > > > /** > > * Checks if the output target exists. > > * > > * @param context The current context. > > * @throws IOException When the check fails. > > * @throws InterruptedException When the job is aborted. > > * @see > > > org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) > > */ > > @Override > > public void checkOutputSpecs(JobContext context) throws IOException, > > InterruptedException { > > // TODO Check if the table exists? > > > > } > > > > /** > > * Returns the output committer. > > * > > * @param context The current context. > > * @return The committer. > > * @throws IOException When creating the committer fails. > > * @throws InterruptedException When the job is aborted. > > * @see > > > org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > */ > > @Override > > public OutputCommitter getOutputCommitter(TaskAttemptContext context) > > throws IOException, InterruptedException { > > return new TableOutputCommitter(); > > } > > > > public Configuration getConf() { > > return conf; > > } > > > > @Override > > public void setConf(Configuration otherConf) { > > this.conf = HBaseConfiguration.create(otherConf); > > > > String tableName = this.conf.get(OUTPUT_TABLE); > > if(tableName == null || tableName.length() <= 0) { > > throw new IllegalArgumentException("Must specify table name"); > > } > > > > String address = this.conf.get(QUORUM_ADDRESS); > > int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); > > String serverClass = this.conf.get(REGION_SERVER_CLASS); > > String serverImpl = this.conf.get(REGION_SERVER_IMPL); > > > > try { > > if (address != null) { > > ZKUtil.applyClusterKeyToConf(this.conf, address); > > } > > if (serverClass != null) { > > this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); > > } > > if (zkClientPort != 0) { > > this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); > > } > > this.table = new HTable(this.conf, tableName); > > this.table.setAutoFlush(false, true); > > * String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), > > this.table.getName()).toString();* > > * this.conf.set("mapred.output.dir", outDir);* > > * otherConf.set("mapred.output.dir", outDir);* > > LOG.info("Created table instance for " + tableName); > > } catch(IOException e) { > > LOG.error(e); > > throw new RuntimeException(e); > > } > > } > > } > > > > >