[ 
https://issues.apache.org/jira/browse/FLUME-2878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ldssea updated FLUME-2878:
--------------------------
    Description: 
Add  field projecting Interceptor
According to the collection data separator, and then according to the field 
subscript position, projection of the user needs of the field.
For Example:
data format before projecting
field1|field2|field3|field4|field5
result after projecting
field2|field5|field3


  was:
package com.huawei.streaming.application.flume.interceptors.fieldprojecting;

import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.huawei.streaming.application.flume.utils.LogUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import 
com.huawei.streaming.application.flume.interceptors.standardize.StandardizeInterceptors;

/**
 * 字段投影功能 过滤出用户需要的字段
 * 
 * @author l00349086
 * 
 */
public class FieldProjectingInterceptors implements Interceptor {

        private final String separator;
        private final String[] outFields;
        private final boolean trim;
        private String line;
        private Charset charset = Charset.defaultCharset();
        private static final Logger logger = LoggerFactory
                        .getLogger(StandardizeInterceptors.class);

        private FieldProjectingInterceptors(String separator, String[] 
outFields,
                        boolean trim) {
                this.separator = separator;
                this.outFields = outFields;
                this.trim = trim;
        }

        @Override
        public void close() {
                // TODO Auto-generated method stub

        }

        @Override
        public void initialize() {
                // TODO Auto-generated method stub

        }

        @Override
        public Event intercept(Event event) {
                // TODO Auto-generated method stub
                String data = new String(event.getBody(), charset);
                line = data;
                String[] datas = {};
                try {
                        datas = StringUtils.splitPreserveAllTokens(data, 
separator);
                        data = fieldProject(datas);
                        if (StringUtils.isNotEmpty(data)) {
                                Event newEvent = new SimpleEvent();
                                newEvent.setHeaders(event.getHeaders());
                                newEvent.setBody(data.getBytes(charset));
                                return newEvent;
                        } else {
                                return null;
                        }

                } catch (Exception e) {
                    LogUtils.error(logger, "error line[" + line + "]");
                    LogUtils.error(logger, "interceptor event exception : ", e);
                        return null;
                }

        }

        @Override
        public List<Event> intercept(List<Event> events) {
                // TODO Auto-generated method stub
                List<Event> intercepted = 
Lists.newArrayListWithCapacity(events.size());
                for (Event event : events) {
                        Event interceptedEvent = intercept(event);
                        if (interceptedEvent != null) {
                                intercepted.add(interceptedEvent);
                        }
                }
                return intercepted;
        }

        private String fieldProject(String[] datas) {
                StringBuilder build = new StringBuilder();
                int length = outFields.length;
                try {
                        int i = 1;
                        for (String field : outFields) {
                                int index = Integer.valueOf(field);
                                if (trim) {
                                        build.append(datas[index - 1].trim());
                                } else {
                                        build.append(datas[index - 1]);
                                }

                                if (i != length) {
                                        build.append(separator);
                                }
                                i++;
                        }
                        return build.toString();
                } catch (Exception e) {
                    LogUtils.error(logger, "error line[" + line + "]");
                    LogUtils.error(logger, "fieldProject exception : ", e);
                        return "";
                }

        }

        public static class Builder implements Interceptor.Builder {
                private String separator;
                private boolean trim;
                private String[] outFields = {};

                @Override
                public void configure(Context context) {
                        // 分隔符考虑空格 和 tab符
                        separator = context.getString(Constants.SEPARATOR, "|");
                        if (StringUtils.isNotEmpty(separator)) {
                                if (separator.equalsIgnoreCase("space")) {
                                        separator = Constants.SPACE;
                                }
                                if (separator.equalsIgnoreCase("tab")) {
                                        separator = Constants.TAB;
                                }
                        }

                        String fields = 
context.getString(Constants.OUTPUT_FIELD, "");
                        if (StringUtils.isNotEmpty(fields)) {
                                // 处理多空格情况 全部替换为一个空格
                                String regex = "\\s+";
                                fields = fields.replaceAll(regex, 
Constants.SPACE);

                                outFields = 
StringUtils.splitPreserveAllTokens(fields,
                                                Constants.SPACE);
                                // 检查outFields,必须是大于0的正整数
                                
Preconditions.checkArgument(checkStrings(outFields),
                                                "Supplied outputFields must be 
number.");
                        }
                        Preconditions.checkArgument(outFields.length != 0,
                                        "Supplied outputFields is null.");

                        trim = context.getBoolean(Constants.TRIM, false);
                }

                private boolean checkStrings(String[] params) {
                        boolean boo = true;
                        if (params.length == 0) {
                                return false;
                        }

                        for (String param : params) {
                                boo = checkString(param);
                                if (!boo) {
                                        return false;
                                }
                        }

                        return true;
                }

                private boolean checkString(String param) {
                        try {
                                int flag = Integer.valueOf(param);
                                if (flag > 0) {
                                        return true;
                                } else {
                                        return false;
                                }
                        } catch (Exception e) {
                                return false;
                        }
                }

                @Override
                public Interceptor build() {
                        // TODO Auto-generated method stub
                        return new FieldProjectingInterceptors(separator, 
outFields, trim);
                }

        }

        public static class Constants {
                public static final String SEPARATOR = "separator";

                public static final String OUTPUT_FIELD = "outputFields";

                public static final String TRIM = "trim";

                public static final String SPACE = " ";

                public static final String TAB = "\t";
        }

}



> add field projecting Interceptor
> --------------------------------
>
>                 Key: FLUME-2878
>                 URL: https://issues.apache.org/jira/browse/FLUME-2878
>             Project: Flume
>          Issue Type: New Feature
>          Components: Sinks+Sources
>         Environment: NA
>            Reporter: ldssea
>   Original Estimate: 8h
>  Remaining Estimate: 8h
>
> Add  field projecting Interceptor
> According to the collection data separator, and then according to the field 
> subscript position, projection of the user needs of the field.
> For Example:
> data format before projecting
> field1|field2|field3|field4|field5
> result after projecting
> field2|field5|field3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to