Index: src/main/java/org/apache/servicemix/components/util/SimpleFlatFileMarshaler.java
===================================================================
--- src/main/java/org/apache/servicemix/components/util/SimpleFlatFileMarshaler.java	(revision 654702)
+++ src/main/java/org/apache/servicemix/components/util/SimpleFlatFileMarshaler.java	(working copy)
@@ -29,15 +29,18 @@
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
 
+import javax.xml.transform.stream.StreamSource;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
-import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A simple flat file marshaler that can read fixed-length and csv text files
  * and converts them to XML
  * 
  * @author Juergen Mayrbaeurl
+ * @author Andrew Skiba
  * @since 3.2
  */
 public class SimpleFlatFileMarshaler extends DefaultFileMarshaler {
@@ -59,7 +60,8 @@
     private static final String XML_CLOSE_NEWLINE = ">\n";
     private static final String XML_CLOSE_ATTR_NEWLINE = "\">\n";
     private static final String XML_CLOSE_ATTR = "\">";
-
+    protected final Log log = LogFactory.getLog(getClass());
+    
     private boolean xmlDeclaration = true;
 
     /**
@@ -109,15 +111,25 @@
     private boolean insertColNumbers;
 
     private boolean skipKnownEmptyCols = true;
-
+    
+    private boolean skipAnyEmptyCols;
+    
     private boolean alwaysStripColContents = true;
-
+    
     private boolean insertRawData;
 
     private boolean insertColContentInAttribut;
 
     private int headerlinesCount;
 
+    public boolean isSkipAnyEmptyCols() {
+        return skipAnyEmptyCols;
+    }
+
+    public void setSkipAnyEmptyCols(boolean skipAnyEmptyCols) {
+        this.skipAnyEmptyCols = skipAnyEmptyCols;
+    }
+
     private static class CustomEndOfLineIterator
             implements Iterator {
 
@@ -198,76 +210,204 @@
     }
 
     public void readMessage(MessageExchange exchange,
-                            NormalizedMessage message, 
-                            InputStream in, 
+                            NormalizedMessage message,
+                            InputStream in,
                             String path) throws IOException, JBIException {
-        message.setContent(new StringSource(this.convertLinesToString(message, in, path)));
+        message.setContent(new StreamSource(this.convertLines(message, in, path)));
         message.setProperty(FILE_NAME_PROPERTY, new File(path).getName());
         message.setProperty(FILE_PATH_PROPERTY, path);
     }
-
+    
     // Implementation methods
     // -------------------------------------------------------------------------
     protected InputStream convertLinesToStream(NormalizedMessage message,
                                                InputStream in, String path) throws IOException {
-        return IOUtils.toInputStream(this.convertLinesToString(message, in,
-                path), "UTF-8");
+        //Backward compatibility trick: if implemented in a subclass, overriden behavior will be used
+        //if buffer is untouched, headers will be proceed line-by-line
+        String wholeFileConverted = this.convertLinesToString(message, in, path);
+        if (wholeFileConverted != null) {
+            return IOUtils.toInputStream(wholeFileConverted, "UTF-8");
+        } else {
+            return new InputStreamWrapper(in, path, "UTF-8");
+        }
     }
 
     protected InputStream convertLines(NormalizedMessage message,
                                        InputStream in, String path) throws IOException {
         return this.convertLinesToStream(message, in, path);
     }
-
+    
+    //This method is for backward compatibility only, use InputStreamWrapper to
+    //process InputStream line-by-line
+    @Deprecated
     protected String convertLinesToString(NormalizedMessage message,
                                           InputStream in, String path) throws IOException {
-        Iterator lines;
-        if (lineSeparator == null) {
-            lines = IOUtils.lineIterator(in, this.encoding);
-        } else {
-            lines = new CustomEndOfLineIterator(in, this.encoding, lineSeparator);
+        //Backward compatibility trick: if implemented in a subclass, overriden behavior will be used
+        //if returns null, file will be proceed line-by-line
+        return null;
+    }
+
+
+
+    final class InputStreamWrapper extends InputStream {
+
+        private boolean isEOF;
+        private byte[] cache;
+        private int cacheLen;
+        private int cachePos;
+        private InputStream in;
+        private String path;
+        private String outEncoding;
+        private int headerLinesLeft;
+        private Iterator lines;
+        private int lineNumber;
+        private boolean isFooterFilled;
+
+        InputStreamWrapper(InputStream in,
+                String path, String outEncoding) throws
+                UnsupportedEncodingException, IOException {
+            log.trace("Entered InputStreamWrapper constructor");
+
+            //make sure outEncoding is good, otherwise fail early
+            " ".getBytes(outEncoding);
+
+            this.in = in;
+            this.path = path;
+            this.outEncoding = outEncoding;
+            this.headerLinesLeft = headerlinesCount;
+            if (lineSeparator == null) {
+                lines = IOUtils.lineIterator(in, encoding);
+            } else {
+                lines =
+                        new CustomEndOfLineIterator(in, encoding, lineSeparator);
+            }
+            log.trace("Leaving InputStreamWrapper constructor");
         }
 
-        StringBuffer aBuffer = new StringBuffer(1024);
+        @Override
+        public int read() throws IOException {
+            fillCache();
+            if (isEOF) {
+                return -1;
+            }
+            return 0xFF & cache[cachePos++];
+        }
 
-        if (this.xmlDeclaration) {
-            aBuffer.append(XMLDECLARATION_LINE);
+        private void fillCache() throws IOException {
+            if (cachePos < cacheLen || isEOF) {
+                return;
+            }
+            if (cache == null) {
+                fillInitial();
+                return;
+            }
+            if (!lines.hasNext()) {
+                if (!isFooterFilled) {
+                    fillFooter();
+                    return;
+                } else {
+                    isEOF = true;
+                    cache = null;
+                    cachePos = 0;
+                    cacheLen = 0;
+                    return;
+                }
+            }
+
+            if (headerLinesLeft > 0) {
+                fillHeader();
+            } else {
+                fillBody();
+            }
         }
 
-        aBuffer.append(XML_OPEN + this.docElementname);
+        @Override
+        public int available() throws IOException {
+            return cacheLen - cachePos + in.available();
+        }
 
-        if (this.docElementNamespace != null) {
-            aBuffer.append("xmlns=\"");
-            aBuffer.append(this.docElementNamespace);
-            aBuffer.append("\"");
+        @Override
+        public void close() throws IOException {
+            in.close();
         }
 
-        aBuffer.append(" name=\"");
-        aBuffer.append(new File(path).getName());
-        aBuffer.append("\"");
+        private void fill(String string) {
+            if (log.isTraceEnabled()) {
+                log.trace("InputStreamWrapper.fill(" + string + ")");
+            }
+            try {
+                cache = string.getBytes(outEncoding);
+            } catch (UnsupportedEncodingException ex) {
+                throw new RuntimeException(
+                        "Bug in the code flow: unsupported encoding should be detected in constructor",
+                        ex);
+            }
+            cachePos = 0;
+            cacheLen = cache.length;
+        }
 
-        aBuffer.append(" location=\"");
-        aBuffer.append(path);
-        aBuffer.append(XML_CLOSE_ATTR_NEWLINE);
+        private void fillFooter() {
+            isFooterFilled = true;
+            fill(XML_OPEN_END + docElementname + XML_CLOSE_NEWLINE);
+        }
 
-        this.processHeaderLines(aBuffer, lines);
+        private void fillHeader() throws IOException {
+            headerLinesLeft--;
+            StringBuffer aBuffer = new StringBuffer(1024);
+            String headerLine = (String) lines.next();
+            convertHeaderline(aBuffer, headerLine);
+            fill(aBuffer.toString());
+        }
 
-        int lineNumber = 1;
-        while (lines.hasNext()) {
+        private void fillInitial() throws IOException {
+            StringBuffer aBuffer = new StringBuffer(1024);
+
+            if (xmlDeclaration) {
+                aBuffer.append(XMLDECLARATION_LINE);
+            }
+
+            aBuffer.append(XML_OPEN + docElementname);
+
+            if (docElementNamespace != null) {
+                aBuffer.append("xmlns=\"");
+                aBuffer.append(docElementNamespace);
+                aBuffer.append("\"");
+            }
+
+            aBuffer.append(" name=\"");
+            aBuffer.append(new File(path).getName());
+            aBuffer.append("\"");
+
+            aBuffer.append(" location=\"");
+            aBuffer.append(path);
+            aBuffer.append(XML_CLOSE_ATTR_NEWLINE);
+            int overridenCheck = aBuffer.length();
+            processHeaderLines(aBuffer, lines);
+            if (aBuffer.length() != overridenCheck) {
+                //headers were proceed by an overriden method, supress futher processing
+                headerLinesLeft = 0;
+            }
+            fill(aBuffer.toString());
+        }
+
+        private void fillBody() throws IOException {
+            lineNumber++;
+
+            StringBuffer aBuffer = new StringBuffer(1024);
             String lineText = (String) lines.next();
-            aBuffer.append(XML_OPEN + this.lineElementname);
+            aBuffer.append(XML_OPEN + lineElementname);
 
-            if (this.insertLineNumbers || this.insertRawData) {
-                if (this.insertLineNumbers) {
+            if (insertLineNumbers || insertRawData) {
+                if (insertLineNumbers) {
                     aBuffer.append(" number=\"");
                     aBuffer.append(lineNumber);
-                    if (!this.insertRawData) {
+                    if (!insertRawData) {
                         aBuffer.append(XML_CLOSE_ATTR);
                     } else {
                         aBuffer.append("\"");
                     }
                 }
-                if (this.insertRawData) {
+                if (insertRawData) {
                     aBuffer.append(" raw=\"");
                     aBuffer.append(lineText);
                     aBuffer.append(XML_CLOSE_ATTR);
@@ -276,36 +416,31 @@
                 aBuffer.append(XML_CLOSE);
             }
 
-            if ((this.columnLengths != null)
-                    || (this.lineFormat != LINEFORMAT_FIXLENGTH)) {
-                this.extractColumns(aBuffer, lineText, lines);
+            if ((columnLengths != null)
+                    || (lineFormat != LINEFORMAT_FIXLENGTH)) {
+                extractColumns(aBuffer, lineText, lines);
             } else {
                 aBuffer.append(lineText);
             }
-            aBuffer.append(XML_OPEN_END + this.lineElementname
+            aBuffer.append(XML_OPEN_END + lineElementname
                     + XML_CLOSE_NEWLINE);
-
-            lineNumber++;
+            
+            fill(aBuffer.toString());
         }
-
-        aBuffer.append(XML_OPEN_END + this.docElementname + XML_CLOSE_NEWLINE);
-
-        return aBuffer.toString();
     }
 
+    
+    @Deprecated
     protected void processHeaderLines(StringBuffer buffer, Iterator lines) {
-        if ((this.headerlinesCount > 0) && (lines.hasNext())) {
-            int counter = 0;
-            do {
-                String headerline = (String) lines.next();
-                this.convertHeaderline(buffer, headerline);
-
-                counter++;
-            } while ((counter < this.headerlinesCount) && (lines.hasNext()));
-        }
+        //Backward compatibility trick: if implemented in a subclass, overriden behavior will be used
+        //if buffer is untouched, headers will be proceed line-by-line
     }
 
-    protected void convertHeaderline(StringBuffer buffer, String headerline) {
+    protected void convertHeaderline(StringBuffer buffer, String headerLine) {
+        buffer.append("<!-- ");
+        headerLine = TEXT_STRIPPER.convertToXml(headerLine);
+        headerLine = headerLine.replaceAll("--", "__");
+        buffer.append(" -->\n");
     }
 
     protected void extractColumns(StringBuffer buffer, String lineText,
@@ -324,8 +459,7 @@
                     // Or maybe insert NULL Element
                 } else {
                     if (!((colContents.length() == 0)
-                            && (this.skipKnownEmptyCols) && (!this.columnElementname
-                            .equals(colName)))) {
+                            && (this.skipAnyEmptyCols || (this.skipKnownEmptyCols && (!this.columnElementname.equals(colName)))))) {
                         if (this.insertColContentInAttribut) {
                             buffer.append(XML_OPEN + colName);
 
