Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/573#discussion_r27808440
  
    --- Diff: 
flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
 ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.stormcompatibility.api;
    +
    +import java.util.List;
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +
    +import backtype.storm.topology.IRichBolt;
    +import backtype.storm.topology.IRichSpout;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.utils.Utils;
    +
    +
    +
    +
    +
    +/**
    + * {@link FlinkOutputFieldsDeclarer} is used to get the declared output 
schema of a {@link IRichSpout spout} or
    + * {@link IRichBolt bolt}.<br />
    + * <br />
    + * <strong>CAUTION: Currently, Flink does only support the default output 
stream. Furthermore, direct emit is not
    + * supported.</strong>
    + * 
    + * @author mjsax
    + */
    +final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
    +   private Fields outputSchema;
    +   
    +   @Override
    +   public void declare(final Fields fields) {
    +           this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
    +   }
    +   
    +   /**
    +    * {@inheritDoc}
    +    * 
    +    * Direct emit is no supported by Flink. Parameter {@code direct} must 
be {@code false}.
    +    * 
    +    * @throws UnsupportedOperationException
    +    *             if {@code direct} is {@code true}
    +    */
    +   @Override
    +   public void declare(final boolean direct, final Fields fields) {
    +           this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
    +   }
    +   
    +   /**
    +    * {@inheritDoc}
    +    * 
    +    * Currently, Flink only supports the default output stream. Thus, 
pareamter {@code streamId} must be equals to
    +    * {@link Utils#DEFAULT_STREAM_ID}.
    +    * 
    +    * @throws UnsupportedOperationException
    +    *             if {@code streamId} is not equal to {@link 
Utils#DEFAULT_STREAM_ID}
    +    */
    +   @Override
    +   public void declareStream(final String streamId, final Fields fields) {
    +           this.declareStream(streamId, false, fields);
    +   }
    +   
    +   /**
    +    * {@inheritDoc}
    +    * 
    +    * Currently, Flink only supports the default output stream. Thus, 
pareamter {@code streamId} must be equals to
    +    * {@link Utils#DEFAULT_STREAM_ID}. Furthermore, direct emit is no 
supported by Flink and parameter {@code direct}
    +    * must be {@code false}.
    +    * 
    +    * @throws UnsupportedOperationException
    +    *             if {@code streamId} is not equal to {@link 
Utils#DEFAULT_STREAM_ID} or {@code direct} is {@code true}
    +    */
    +   @Override
    +   public void declareStream(final String streamId, final boolean direct, 
final Fields fields) {
    +           if(!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
    +                   throw new UnsupportedOperationException("Currently, 
only the default output stream is supported by Flink");
    +           }
    +           if(direct) {
    +                   throw new UnsupportedOperationException("Direct emit is 
not supported by Flink");
    +           }
    +           
    +           this.outputSchema = fields;
    +   }
    +   
    +   /**
    +    * Returns {@link TypeInformation} for the declared output schema. If 
no or an empty output schema was declared,
    +    * {@code null} is returned.
    +    * 
    +    * @return output type information for the declared output schema; or 
{@code null} if no output schema was declared
    +    * 
    +    * @throws IllegalArgumentException
    +    *             if more then 25 attributes are declared
    +    */
    +   public TypeInformation<?> getOutputType() throws 
IllegalArgumentException {
    +           if((this.outputSchema == null) || (this.outputSchema.size() == 
0)) {
    +                   return null;
    +           }
    +           
    +           Tuple t;
    +           final int numberOfAttributes = this.outputSchema.size();
    +           
    +           if(numberOfAttributes == 1) {
    +                   return TypeExtractor.getForClass(Object.class);
    +           } else if(numberOfAttributes <= 25) {
    +                   try {
    +                           t = 
Tuple.getTupleClass(numberOfAttributes).newInstance();
    +                   } catch(final InstantiationException e) {
    +                           throw new RuntimeException(e);
    +                   } catch(final IllegalAccessException e) {
    +                           throw new RuntimeException(e);
    +                   }
    +           } else {
    +                   throw new IllegalArgumentException("Flink supports only 
a maximum number of 25 attributes.");
    +           }
    +           
    +           // TODO: declare only key fields as DummyComparable
    +           for(int i = 0; i < numberOfAttributes; ++i) {
    +                   t.setField(new DummyComparable(), i);
    +           }
    +           
    +           return TypeExtractor.getForObject(t);
    +   }
    +   
    +   /**
    +    * {@link DummyComparable} is a {@link Comparable} helper class that is 
used to get the correct
    +    * {@link TypeInformation} from {@link TypeExtractor} within {@link 
#getOutputType()}. If key fields are not
    +    * comparable, Flink cannot use them and will throw an exception.
    +    * 
    --- End diff --
    
    A not word is missing from the javadoc. If keys are not comparable then an 
exception is thrown. Maybe a rename would also be nice. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to