Github user akumarb2010 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1027#discussion_r151889218
  
    --- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
 ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.kafka.decoders;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +
    +public class MessageReaderFactory {
    +
    +  private static final Logger logger = 
LoggerFactory.getLogger(MessageReaderFactory.class);
    +
    +  /**
    +   * Initialize kafka message reader beased on store.kafka.record.reader 
session
    +   * property
    +   *
    +   * @param messageReaderKlass
    +   *          value of store.kafka.record.reader session property
    +   * @return kafka message reader
    +   * @throws UserException
    +   *           in case of any message reader initialization
    +   */
    +  public static MessageReader getMessageReader(String messageReaderKlass) {
    +    Preconditions.checkNotNull(messageReaderKlass, "Please set 
store.kafka.record.reader " + messageReaderKlass);
    +    MessageReader messageReader = null;
    +    try {
    +      Class<?> klass = Class.forName(messageReaderKlass);
    --- End diff --
    
    Thanks for this suggestion Paul. Initially we have consider this as Plugin 
config. 
    
    But, in Kafka most of the times users might need to implement their own 
custom MessageReader implementation. 
    
    For example, Kafka messages can be encrypted. In other frameworks like 
Spark streaming or Storm or Camus, user will provide Deserializer/Decoder. 
    
    As you suggested, created a separate JIRA 
https://issues.apache.org/jira/browse/DRILL-5976 for this.


---

Reply via email to